diff --git a/slo-violation-detector/README.md b/slo-violation-detector/README.md index 9496eaf..36bb47c 100644 --- a/slo-violation-detector/README.md +++ b/slo-violation-detector/README.md @@ -106,10 +106,12 @@ then an SLO violation should be triggered. The component can be built using Maven (`mvn clean install -Dtest=!UnboundedMonitoringAttributeTests`). This command should succeed without errors, and verifying that all tests (except for the Unbounded monitoring attribute tests) are successfully executed. Then, any of the produced jar files (either the shaded or non-shaded version) can be run using the following command: -`java -jar ` +`java -jar ` When the component starts correctly it will not display any error logs, and it may also display that it listens for events on the topic in which SLO rules are to be specified (by default **metrics.metric_list**). +It is not mandatory to specify the or the but the defaults will be assumed (the location of the configuration file will be based on the Constants.java class and the role will be OperationalMode.DIRECTOR ) + When debugging/developing, the component can be started from the Java main method which is located inside the src/runtime/Main.java file. ### Testing process @@ -134,6 +136,11 @@ To test the functionality of the component - provided that a working ActiveMQ Br To illustrate, in the case that an SLO message identical to the simple SLO example is sent at step 1, then monitoring messages should be sent in step 2 to the `cpu_usage` topic and predicted monitoring messages should be sent in step 3 to the `prediction.cpu_usage` topic. Finally, SLO violations will be announced at the `prediction.slo_severity_value` topic. +### Development + +Starting new threads in the SLO Violation Detection component should only be done using the CharacterizedThread class, as opposed to using plain Threads - to reassure that Threads are being defined in a way which permits their appropriate management (registration/removal). + + ### Docker container build To run the component in Dockerized form, it is sufficient to build the Dockerfile which is included at the root of the project. When running the docker container, the configuration file which will be used is the `src/main/resources/config/eu.morphemic.slo_violation_detector.properties` file, relative to the root of the project (this location specified as a variable in the `configuration.Constants` class). If another configuration file needs to be used, then it should be mounted over the `/home/src/main/resources/config/eu.morphemic.slo_violation_detector.properties` location. diff --git a/slo-violation-detector/pom.xml b/slo-violation-detector/pom.xml index 2a092b9..e935419 100644 --- a/slo-violation-detector/pom.xml +++ b/slo-violation-detector/pom.xml @@ -9,33 +9,16 @@ 4.0-SNAPSHOT - - org.apache.maven.plugins - maven-shade-plugin - 3.2.4 - - - package - - shade - - - - - runtime.Main - - - - - + org.springframework.boot + spring-boot-maven-plugin org.apache.maven.plugins maven-compiler-plugin - 11 - 11 + 17 + 17 3.8.1 @@ -65,8 +48,33 @@ + + org.springframework.boot + spring-boot-starter-parent + 3.1.5 + + + + org.springframework.boot + spring-boot-starter + + + org.springframework.boot + spring-boot-starter-test + test + + + org.springframework.boot + spring-boot-starter-web + 3.1.2 + + + org.springframework.boot + spring-boot-starter-tomcat + + gr.ntua.imu.morphemic amq-message-java-library 4.0.0-SNAPSHOT @@ -76,13 +84,6 @@ json-simple 1.1 - - - org.projectlombok - lombok - 1.18.30 - provided - @@ -97,25 +98,15 @@ 5.16.1 - - org.slf4j - slf4j-api - 1.8.0-beta4 - - - org.springframework - spring-context - 5.3.3 - - + org.apache.commons commons-collections4 - 4.2 + 4.4 org.apache.commons diff --git a/slo-violation-detector/src/main/java/configuration/Constants.java b/slo-violation-detector/src/main/java/configuration/Constants.java index b67cfd1..53bff12 100644 --- a/slo-violation-detector/src/main/java/configuration/Constants.java +++ b/slo-violation-detector/src/main/java/configuration/Constants.java @@ -36,6 +36,8 @@ public class Constants { public static String configuration_file_location = "src/main/resources/config/input_data.properties"; public static String amq_library_configuration_location = "src/main/resources/config/eu.melodic.event.brokerclient.properties"; public static String topic_for_severity_announcement = "prediction.slo_severity_value"; + public static String topic_for_lost_device_announcement = "eu.nebulouscloud.device_lost"; + public static String slo_rules_topic = "metrics.metric_list"; public static double slo_violation_probability_threshold = 0.5; //The threshold over which the probability of a predicted slo violation should be to have a violation detection public static int kept_values_per_metric = 5; //Default to be overriden from the configuration file. This indicates how many metric values are kept to calculate the "previous" metric value during the rate of change calculation public static String roc_calculation_mode = "prototype"; diff --git a/slo-violation-detector/src/main/java/metric_retrieval/AttributeSubscription.java b/slo-violation-detector/src/main/java/metric_retrieval/AttributeSubscription.java index afa1343..a5dd95c 100644 --- a/slo-violation-detector/src/main/java/metric_retrieval/AttributeSubscription.java +++ b/slo-violation-detector/src/main/java/metric_retrieval/AttributeSubscription.java @@ -18,6 +18,7 @@ import org.json.simple.parser.ParseException; import runtime.Main; import slo_processing.SLORule; import slo_processing.SLOSubRule; +import utility_beans.CharacterizedThread; import utility_beans.PredictedMonitoringAttribute; import utility_beans.RealtimeMonitoringAttribute; @@ -28,13 +29,13 @@ import java.util.logging.Logger; import static configuration.Constants.*; import static runtime.Main.*; +import static utilities.SLOViolationDetectorStateUtils.*; +import static utility_beans.CharacterizedThread.CharacterizedThreadType.slo_bound_running_thread; import static utility_beans.PredictedMonitoringAttribute.getPredicted_monitoring_attributes; import static utility_beans.RealtimeMonitoringAttribute.update_monitoring_attribute_value; public class AttributeSubscription { SLORule slo_rule; - private Thread realtime_subscription_thread, forecasted_subscription_thread; - public AttributeSubscription(SLORule slo_rule, String broker_ip_address, String broker_username, String broker_password){ this.slo_rule = slo_rule; @@ -59,9 +60,9 @@ public class AttributeSubscription { } return message; }; - realtime_subscription_thread = new Thread(() -> { + Runnable realtime_subscription_runnable = () -> { try { - subscriber.subscribe(function, Main.stop_signal); + subscriber.subscribe(function, stop_signal); if(Thread.interrupted()){ throw new InterruptedException(); } @@ -72,11 +73,10 @@ public class AttributeSubscription { } }finally{ Logger.getAnonymousLogger().log(info_logging_level,"Removing realtime subscriber thread for "+realtime_metric_topic_name); - running_threads.remove("realtime_subscriber_thread_" + realtime_metric_topic_name); + slo_bound_running_threads.remove("realtime_subscriber_thread_" + realtime_metric_topic_name); } - }); - running_threads.put("realtime_subscriber_thread_"+realtime_metric_topic_name,realtime_subscription_thread); - realtime_subscription_thread.start(); + }; + CharacterizedThread.create_new_thread(realtime_subscription_runnable,"realtime_subscriber_thread_"+realtime_metric_topic_name,slo_bound_running_thread,true); @@ -137,11 +137,11 @@ public class AttributeSubscription { ADAPTATION_TIMES_MODIFY.setValue(true); ADAPTATION_TIMES_MODIFY.notifyAll(); } - synchronized (Main.can_modify_slo_rules) { - while(!Main.can_modify_slo_rules.getValue()) { - Main.can_modify_slo_rules.wait(); + synchronized (can_modify_slo_rules) { + while(!can_modify_slo_rules.getValue()) { + can_modify_slo_rules.wait(); } - Main.can_modify_slo_rules.setValue(false); + can_modify_slo_rules.setValue(false); for (SLOSubRule subrule : SLOSubRule.getSlo_subrules_per_monitoring_attribute().get(predicted_attribute_name)) { //Get the subrules which are associated to the monitoring attribute which is predicted, and perform the following processing to each one of them getPredicted_monitoring_attributes().computeIfAbsent(subrule.getId(), k -> new HashMap<>()); @@ -173,12 +173,11 @@ public class AttributeSubscription { return message; }; - - forecasted_subscription_thread = new Thread(() -> { + Runnable forecasted_subscription_runnable = () -> { try { - synchronized (Main.HAS_MESSAGE_ARRIVED.get_synchronized_boolean(forecasted_metric_topic_name)) { + synchronized (HAS_MESSAGE_ARRIVED.get_synchronized_boolean(forecasted_metric_topic_name)) { //if (Main.HAS_MESSAGE_ARRIVED.get_synchronized_boolean(forecasted_metric_topic_name).getValue()) - forecasted_subscriber.subscribe(forecasted_function,Main.stop_signal); + forecasted_subscriber.subscribe(forecasted_function,stop_signal); } if (Thread.interrupted()) { throw new InterruptedException(); @@ -190,12 +189,10 @@ public class AttributeSubscription { } }finally { Logger.getAnonymousLogger().log(info_logging_level,"Removing forecasting subscriber thread for "+forecasted_metric_topic_name); - running_threads.remove("forecasting_subscriber_thread_"+forecasted_metric_topic_name); + slo_bound_running_threads.remove("forecasting_subscriber_thread_"+forecasted_metric_topic_name); } - }); - running_threads.put("forecasting_subscriber_thread_"+forecasted_metric_topic_name,forecasted_subscription_thread); - forecasted_subscription_thread.start(); - + }; + CharacterizedThread.create_new_thread(forecasted_subscription_runnable, "forecasting_subscriber_thread_" + forecasted_metric_topic_name, slo_bound_running_thread, true); } } } diff --git a/slo-violation-detector/src/main/java/processing_logic/Runnables.java b/slo-violation-detector/src/main/java/processing_logic/Runnables.java new file mode 100644 index 0000000..3acb1a0 --- /dev/null +++ b/slo-violation-detector/src/main/java/processing_logic/Runnables.java @@ -0,0 +1,155 @@ +package processing_logic; + +import eu.melodic.event.brokerclient.BrokerPublisher; +import org.json.simple.JSONObject; +import slo_processing.SLORule; +import utility_beans.CharacterizedThread; + +import java.io.IOException; +import java.sql.Timestamp; +import java.time.Clock; +import java.util.Date; +import java.util.NoSuchElementException; +import java.util.logging.Logger; + +import static configuration.Constants.*; +import static java.lang.Thread.sleep; +import static runtime.Main.run_slo_violation_detection_engine; +import static slo_processing.SLORule.process_rule_value; +import static utilities.SLOViolationDetectorStateUtils.*; +import static utilities.SLOViolationDetectorUtils.*; +import static utility_beans.CharacterizedThread.CharacterizedThreadType.slo_bound_running_thread; + +public class Runnables { + public static Runnable slo_detector_mode_runnable = () -> { + try { + run_slo_violation_detection_engine(); + } catch (IOException e) { + e.printStackTrace(); + } + }; + + public static Runnable get_severity_calculation_runnable(SLORule rule) { + + Runnable severity_calculation_runnable = () -> { + BrokerPublisher persistent_publisher = new BrokerPublisher(topic_for_severity_announcement, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password"), amq_library_configuration_location); + + while (!stop_signal.get()) { + /*try { + Thread.sleep(time_horizon_seconds*1000); + } catch (InterruptedException e) { + e.printStackTrace(); + }*/ + synchronized (PREDICTION_EXISTS) { + while (!PREDICTION_EXISTS.getValue()) { + try { + PREDICTION_EXISTS.wait(); + } catch (InterruptedException e) { + synchronized (stop_signal) { + if (stop_signal.get()) { + slo_bound_running_threads.remove("severity_calculation_thread_" + rule.toString()); + PREDICTION_EXISTS.setValue(false); + return; + } + } + e.printStackTrace(); + } + } + + } + try { + Clock clock = Clock.systemUTC(); + Long current_time = clock.millis(); + Long targeted_prediction_time; + synchronized (ADAPTATION_TIMES_MODIFY) { + while (!ADAPTATION_TIMES_MODIFY.getValue()) { + try { + ADAPTATION_TIMES_MODIFY.wait(); + } catch (InterruptedException e) { + Logger.getAnonymousLogger().log(warning_logging_level, "Interrupted while waiting to access the lock for adaptation times object"); + e.printStackTrace(); + } + } + ADAPTATION_TIMES_MODIFY.setValue(false); + clean_data(adaptation_times_to_remove); + //targeted_prediction_time = adaptation_times.stream().min(Long::compare).get(); + targeted_prediction_time = get_next_targeted_prediction_time(); + ADAPTATION_TIMES_MODIFY.setValue(true); + ADAPTATION_TIMES_MODIFY.notifyAll(); + } + if (targeted_prediction_time == null) { + continue; + } + Logger.getAnonymousLogger().log(info_logging_level, "Targeted_prediction_time " + targeted_prediction_time); + Runnable internal_severity_calculation_runnable = () -> { + try { + /* + synchronized (ADAPTATION_TIMES_MODIFY) { + while (!ADAPTATION_TIMES_MODIFY.getValue()) { + ADAPTATION_TIMES_MODIFY.wait(); + } + ADAPTATION_TIMES_MODIFY.setValue(false); + adaptation_times.remove(targeted_prediction_time);//remove from the list of timepoints which should be processed. Later this timepoint will be added to the adaptation_times_to_remove HashSet to remove any data associated with it + ADAPTATION_TIMES_MODIFY.setValue(true); + ADAPTATION_TIMES_MODIFY.notifyAll(); + } + //adaptation_times_pending_processing.add(targeted_prediction_time); + + */ + synchronized (PREDICTION_EXISTS) { + PREDICTION_EXISTS.setValue(adaptation_times.size() > 0); + } + + Long sleep_time = targeted_prediction_time * 1000 - time_horizon_seconds * 1000L - current_time; + if (sleep_time <= 0) { + Logger.getAnonymousLogger().log(info_logging_level, "Prediction cancelled as targeted prediction time was " + targeted_prediction_time * 1000 + " current time is " + current_time + " and the time_horizon is " + time_horizon_seconds * 1000); + return; //The predictions are too near to the targeted reconfiguration time (or are even obsolete) + } else if (sleep_time > current_time + maximum_acceptable_forward_predictions * time_horizon_seconds * 1000L) { + Logger.getAnonymousLogger().log(info_logging_level, "Prediction cancelled as targeted prediction time was " + targeted_prediction_time * 1000 + " and the current time is " + current_time + ". The prediction is more than " + maximum_acceptable_forward_predictions + " time_horizon intervals into the future (the time_horizon is " + time_horizon_seconds * 1000 + " milliseconds)"); + return; //The predictions are too near to the targeted reconfiguration tim + } + Logger.getAnonymousLogger().log(info_logging_level, "Sleeping for " + sleep_time + " milliseconds"); + sleep(sleep_time); + double rule_severity = process_rule_value(rule.getRule_representation(), targeted_prediction_time, rule.getRule_format()); + double slo_violation_probability = determine_slo_violation_probability(rule_severity); + Logger.getAnonymousLogger().log(info_logging_level, "The overall " + slo_violation_determination_method + " severity - calculated from real data - for adaptation time " + targeted_prediction_time + " ( " + (new Date((new Timestamp(targeted_prediction_time * 1000)).getTime())) + " ) is " + rule_severity + " and is calculated " + time_horizon_seconds + " seconds beforehand"); + Logger.getAnonymousLogger().log(info_logging_level, "The probability of an SLO violation is " + ((int) (slo_violation_probability * 100)) + "%" + (slo_violation_probability < slo_violation_probability_threshold ? " so it will not be published" : " and it will be published")); + + if (slo_violation_probability >= slo_violation_probability_threshold) { + JSONObject severity_json = new JSONObject(); + severity_json.put("severity", rule_severity); + severity_json.put("probability", slo_violation_probability); + severity_json.put("predictionTime", targeted_prediction_time); + persistent_publisher.publish(severity_json.toJSONString()); + } + + slo_violation_event_recording_queue.add(System.currentTimeMillis()); + + //Probably not necessary to synchronize the line below as each removal will happen only once in a reconfiguration interval, and reconfiguration intervals are assumed to have a duration of minutes. + //Necessary to synchronize because another severity calculation thread might invoke clean_data above, and then a concurrent modification exception may arise + synchronized (ADAPTATION_TIMES_MODIFY) { + while (!ADAPTATION_TIMES_MODIFY.getValue()) { + ADAPTATION_TIMES_MODIFY.wait(); + } + ADAPTATION_TIMES_MODIFY.setValue(false); + adaptation_times_to_remove.add(targeted_prediction_time); //This line serves a different purpose from the adaptation_times.remove(...) directive above, as the adaptation_times_to_remove HashSet contains timepoints which should be processed to delete their data. + adaptation_times_pending_processing.remove(targeted_prediction_time); + ADAPTATION_TIMES_MODIFY.setValue(true); + ADAPTATION_TIMES_MODIFY.notifyAll(); + } + } catch (InterruptedException i) { + Logger.getAnonymousLogger().log(severe_logging_level, "Severity calculation thread for epoch time " + targeted_prediction_time + " interrupted, stopping..."); + return; + } + }; + CharacterizedThread.create_new_thread(internal_severity_calculation_runnable, "internal_severity_calculation_thread_" + targeted_prediction_time, slo_bound_running_thread, true); + } catch (NoSuchElementException n) { + Logger.getAnonymousLogger().log(warning_logging_level, "Could not calculate severity as a value was missing..."); + continue; + } + } + slo_bound_running_threads.remove("severity_calculation_thread_" + rule.toString()); + }; + return severity_calculation_runnable; + } +} diff --git a/slo-violation-detector/src/main/java/runtime/DebugMainClass.java b/slo-violation-detector/src/main/java/runtime/DebugMainClass.java new file mode 100644 index 0000000..66c1be9 --- /dev/null +++ b/slo-violation-detector/src/main/java/runtime/DebugMainClass.java @@ -0,0 +1,719 @@ +/* + * Copyright (c) 2023 Institute of Communication and Computer Systems + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + */ +package runtime; +import eu.melodic.event.brokerclient.BrokerPublisher; +import eu.melodic.event.brokerclient.BrokerSubscriber; +import metric_retrieval.AttributeSubscription; +import org.apache.commons.collections4.queue.CircularFifoQueue; +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; +import slo_processing.SLORule; +import slo_processing.SLOSubRule; +import utilities.DebugDataSubscription; +import utilities.MathUtils; +import utilities.MonitoringAttributeUtilities; +import utility_beans.*; + +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStream; +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.sql.Timestamp; +import java.time.Clock; +import java.util.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiFunction; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.stream.Collectors; + +import static configuration.Constants.*; +import static java.lang.Thread.sleep; +import static slo_processing.SLORule.process_rule_value; +import static utility_beans.PredictedMonitoringAttribute.getPredicted_monitoring_attributes; +import static utility_beans.RealtimeMonitoringAttribute.getMonitoring_attributes_bounds_representation; + +public class DebugMainClass { + public static final AtomicBoolean stop_signal = new AtomicBoolean(false); + public static final SynchronizedBoolean PREDICTION_EXISTS = new SynchronizedBoolean(false); + public static final SynchronizedBoolean ADAPTATION_TIMES_MODIFY = new SynchronizedBoolean(true); + public static SynchronizedBooleanMap HAS_MESSAGE_ARRIVED = new SynchronizedBooleanMap(); + public static SynchronizedStringMap MESSAGE_CONTENTS = new SynchronizedStringMap(); + public static ArrayList slo_rules = new ArrayList<>(); + public static HashMap running_threads = new HashMap<>(); + public static HashSet adaptation_times = new HashSet<>(); + public static HashSet adaptation_times_pending_processing = new HashSet<>(); + private static HashSet adaptation_times_to_remove = new HashSet<>(); + public static Long last_processed_adaptation_time = -1L;//initialization + public static Long current_slo_rules_version = -1L;//initialization + public static final AtomicBoolean slo_rule_arrived = new AtomicBoolean(false); + public static final SynchronizedBoolean can_modify_slo_rules = new SynchronizedBoolean(false); + + //Debugging variables + public static CircularFifoQueue slo_violation_event_recording_queue = new CircularFifoQueue<>(50); + public static CircularFifoQueue severity_calculation_event_recording_queue = new CircularFifoQueue<>(50); + private static Properties prop = new Properties(); + + public static void main(String[] args) { + + //The input of this program is the type of the SLO violations which are monitored, and the predicted metric values which are evaluated. Its output are events which produce an estimate of the probability of an adaptation. + //The SLO violations which are monitored need to mention the following data: + // - The name of the predicted metrics which are monitored as part of the rule + // - The threshold and whether it is a more-than or less-than threshold + //The predicted metrics need to include the following data: + // - The predicted value + // - The prediction confidence + + //During 'normal' execution, business code starts being executed inside the while loop in the try-catch block, and more specifically, code in the 'else' part of the first *if (first_run && USE_CONFIGURATION_FILE_FOR_METRIC_VALUES_INPUT)* statement, specifically inside the 'else' part, of the second *if (first_run && USE_CONFIGURATION_FILE_FOR_METRIC_TOPICS_INPUT)* statement. This code initializes a subscriber to the topic where rules are expected to be received. This debug class includes also other options for the initial startup of the component. + + try { + InputStream inputStream = null; + if (args.length == 0) { + base_project_path = new File("").toURI(); + URI absolute_configuration_file_path = new File(configuration_file_location).toURI(); + URI relative_configuration_file_path = base_project_path.relativize(absolute_configuration_file_path); + Logger.getAnonymousLogger().log(info_logging_level,"This is the base project path:"+base_project_path); + inputStream = new FileInputStream(base_project_path.getPath()+relative_configuration_file_path); + } else { + Logger.getAnonymousLogger().log(info_logging_level, "A preferences file has been manually specified"); + + if (base_project_path == null || base_project_path.getPath().equals(EMPTY)) { + base_project_path = new File(args[0]).toURI(); + } + inputStream = new FileInputStream(base_project_path.getPath()); + } + prop.load(inputStream); + String slo_rules_topic = prop.getProperty("slo_rules_topic"); + kept_values_per_metric = Integer.parseInt(prop.getProperty("stored_values_per_metric","5")); + self_publish_rule_file = Boolean.parseBoolean(prop.getProperty("self_publish_rule_file")); + single_slo_rule_active = Boolean.parseBoolean(prop.getProperty("single_slo_rule_active")); + time_horizon_seconds = Integer.parseInt(prop.getProperty("time_horizon_seconds")); + + slo_violation_probability_threshold = Double.parseDouble(prop.getProperty("slo_violation_probability_threshold")); + slo_violation_determination_method = prop.getProperty("slo_violation_determination_method"); + maximum_acceptable_forward_predictions = Integer.parseInt(prop.getProperty("maximum_acceptable_forward_predictions")); + ArrayList unbounded_metric_strings = new ArrayList(Arrays.asList(prop.getProperty("metrics_bounds").split(","))); + for (String metric_string : unbounded_metric_strings){ + getMonitoring_attributes_bounds_representation().put(metric_string.split(";")[0], metric_string.split(";",2)[1]); + } + + while (true) { + + if (first_run && USE_CONFIGURATION_FILE_FOR_METRIC_VALUES_INPUT) { + + String json_file_name = prop.getProperty("input_file"); + slo_violation_determination_method = prop.getProperty("slo_violation_determination_method"); + confidence_interval = Double.parseDouble(prop.getProperty("confidence_interval")); + prediction_certainty = Double.parseDouble(prop.getProperty("prediction_certainty")); + + ArrayList metric_names = new ArrayList<>() {{ + add("cpu"); + add("ram"); + add("bandwidth"); + add("disk"); + }}; + HashMap input_data = new HashMap<>(); + for (String metric_name : metric_names) { + + Double metric_input_data = MathUtils.get_average(new ArrayList<>(Arrays.stream(prop.getProperty(metric_name).split(",")).map(Double::parseDouble).collect(Collectors.toList()))); + + input_data.put(metric_name, metric_input_data); + } + + + RealtimeMonitoringAttribute.initialize_monitoring_attribute_rates_of_change(metric_names); + RealtimeMonitoringAttribute.simple_initialize_0_100_bounded_attributes(metric_names); + + RealtimeMonitoringAttribute.update_monitoring_attributes_values_map(input_data); + + //Parsing of file + String rules_json_string = String.join(EMPTY, Files.readAllLines(Paths.get(new File(json_file_name).getAbsolutePath()))); + Logger.getAnonymousLogger().log(info_logging_level, rules_json_string); + MESSAGE_CONTENTS.assign_value(slo_rules_topic, rules_json_string); + slo_rules.add(new SLORule(MESSAGE_CONTENTS.get_synchronized_contents(slo_rules_topic), new ArrayList<>(Arrays.asList(prop.getProperty("metrics_list").split(","))))); + + } else { + if (first_run && USE_CONFIGURATION_FILE_FOR_METRIC_TOPICS_INPUT) { + synchronized (can_modify_slo_rules) { + //do { + // can_modify_slo_rules.wait(); + //}while(!can_modify_slo_rules.getValue()); + can_modify_slo_rules.setValue(false); + + slo_rules.add(new SLORule(MESSAGE_CONTENTS.get_synchronized_contents(slo_rules_topic), new ArrayList<>(Arrays.asList(prop.getProperty("metrics_list").split(","))))); + can_modify_slo_rules.setValue(true); + slo_rule_arrived.set(true); + can_modify_slo_rules.notifyAll(); + } + } else if (first_run){ + + BiFunction function = (topic, message) -> { + synchronized (can_modify_slo_rules) { + can_modify_slo_rules.setValue(true); + MESSAGE_CONTENTS.assign_value(topic, message); + slo_rule_arrived.set(true); + can_modify_slo_rules.notifyAll(); + + Logger.getAnonymousLogger().log(info_logging_level, "BrokerClientApp: - Received text message: " + message + " at topic " + topic); + + } + return topic + ":MSG:" + message; + }; + + BrokerSubscriber subscriber = new BrokerSubscriber(slo_rules_topic, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password"), amq_library_configuration_location); + new Thread(() -> { + while (true) { + subscriber.subscribe(function, new AtomicBoolean(false)); //This subscriber should be immune to stop signals + Logger.getAnonymousLogger().log(info_logging_level,"Broker unavailable, will try to reconnect after 10 seconds"); + try { + Thread.sleep(10000); + }catch (InterruptedException i){ + Logger.getAnonymousLogger().log(info_logging_level,"Sleep was interrupted, will immediately try to connect to the broker"); + } + } + }).start(); + + if (self_publish_rule_file) { + String json_file_name = prop.getProperty("input_file"); + String rules_json_string = String.join(EMPTY, Files.readAllLines(Paths.get(new File(json_file_name).getAbsolutePath()))); + BrokerPublisher publisher = new BrokerPublisher(slo_rules_topic, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password"), amq_library_configuration_location); + publisher.publish(rules_json_string); + Logger.getAnonymousLogger().log(info_logging_level, "Sent message\n" + rules_json_string); + } + } + } + first_run = false; + synchronized (can_modify_slo_rules) { + do { + try { + can_modify_slo_rules.wait(); + }catch (InterruptedException i){ + i.printStackTrace(); + } + }while((!can_modify_slo_rules.getValue()) || (!slo_rule_arrived.get())); + can_modify_slo_rules.setValue(false); + slo_rule_arrived.set(false); + String rule_representation = MESSAGE_CONTENTS.get_synchronized_contents(slo_rules_topic); + if (slo_rule_arrived_has_updated_version(rule_representation)) { + if (single_slo_rule_active) { + slo_rules.clear(); + } + + ArrayList additional_metrics_from_new_slo = get_metric_list_from_JSON_slo(rule_representation); + + if (additional_metrics_from_new_slo.size() > 0) { + slo_rules.add(new SLORule(rule_representation, additional_metrics_from_new_slo)); + } + can_modify_slo_rules.setValue(true); + can_modify_slo_rules.notifyAll(); + }else{ + can_modify_slo_rules.setValue(true); + can_modify_slo_rules.notifyAll(); + continue; + } + + + + } + + stop_all_running_threads(); + DebugDataSubscription.initiate(prop.getProperty("broker_ip_url"),prop.getProperty("broker_username"), prop.getProperty("broker_password")); + initialize_monitoring_datastructures_with_empty_data(slo_rules); + // + initialize_subrule_and_attribute_associations(slo_rules); + initialize_attribute_subscribers(slo_rules, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password")); + initialize_slo_processing(slo_rules); + + //while (true) { + + //} + } + }catch (Exception e){ + Logger.getAnonymousLogger().log(info_logging_level,"Problem reading input file"); + e.printStackTrace(); + } + } + + private static boolean slo_rule_arrived_has_updated_version(String rule_representation) { + JSONObject json_object = null; + long json_object_version = Integer.MAX_VALUE; + try { + json_object = (JSONObject) new JSONParser().parse(rule_representation); + json_object_version = (Long) json_object.get("version"); + } catch (NullPointerException n){ + n.printStackTrace(); + Logger.getAnonymousLogger().log(info_logging_level,"Unfortunately a null message was sent to the SLO Violation Detector, which is being ignored"); + return false; + } catch (Exception e){ + e.printStackTrace(); + Logger.getAnonymousLogger().log(info_logging_level,"Could not parse the JSON of the new SLO, assuming it is not an updated rule..."); + return false; + } + if (json_object_version > current_slo_rules_version){ + Logger.getAnonymousLogger().log(info_logging_level,"An SLO with updated version ("+json_object_version+" vs older "+current_slo_rules_version+") has arrived"); + current_slo_rules_version=json_object_version; + return true; + }else { + Logger.getAnonymousLogger().log(info_logging_level,"Taking no action for the received SLO message as the version number is not updated"); + return false; + } + } + + private static void stop_all_running_threads() { + Logger.getAnonymousLogger().log(info_logging_level,"Asking previously existing threads to terminate"); + int initial_number_of_running_threads = running_threads.size(); + while (running_threads.size()>0) { + synchronized (stop_signal) { + stop_signal.set(true); + stop_signal.notifyAll(); + } + try { + Thread.sleep(3000); + running_threads.values().forEach(Thread::interrupt); + }catch (Exception e){ + } + Logger.getAnonymousLogger().log(info_logging_level,"Stopped "+(initial_number_of_running_threads-running_threads.size())+"/"+initial_number_of_running_threads+" already running threads"); + if (running_threads.size()>1){ + Logger.getAnonymousLogger().log(info_logging_level,"The threads which are still running are the following: "+running_threads); + }else if (running_threads.size()>0){ + Logger.getAnonymousLogger().log(info_logging_level,"The thread which is still running is the following: "+running_threads); + } + } + Logger.getAnonymousLogger().log(info_logging_level,"All threads have terminated"); + synchronized (stop_signal) { + stop_signal.set(false); + } + synchronized (PREDICTION_EXISTS){ + PREDICTION_EXISTS.setValue(false); + } + adaptation_times.clear(); + } + + public static void initialize_subrule_and_attribute_associations(ArrayList slo_rules) { + synchronized (can_modify_slo_rules) { + while (!can_modify_slo_rules.getValue()){ + try { + can_modify_slo_rules.wait(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + can_modify_slo_rules.setValue(false); + for (SLORule slo_rule : slo_rules) { + for (SLOSubRule subrule : SLORule.parse_subrules(slo_rule.getRule_representation(),slo_rule.getRule_format())) { + SLOSubRule.getSlo_subrules_per_monitoring_attribute().computeIfAbsent(subrule.getMetric(), k -> new ArrayList<>()); + SLOSubRule.getSlo_subrules_per_monitoring_attribute().get(subrule.getMetric()).add(subrule); + } + } + can_modify_slo_rules.setValue(true); + can_modify_slo_rules.notifyAll(); + } + } + + + private static void initialize_monitoring_datastructures_with_empty_data(ArrayList slo_rules){ + for(SLORule slo_rule: slo_rules){ + for (String metric_name : slo_rule.get_monitoring_attributes()) { + MonitoringAttributeUtilities.initialize_values(metric_name); + } + } + } + + public static ArrayList get_metric_list_from_JSON_slo(String json_object_string) { + HashSet metric_list = new HashSet<>(); + try { + JSONObject json_object = (JSONObject) new JSONParser().parse(json_object_string); + String json_object_id = (String) json_object.get("id"); + String json_object_name = (String) json_object.get("name"); + //Older format uses id-based fields, newer format uses a non-variable structure + //We first check if an event using the older format is sent, and then check if the event is sent using the newer format + if (json_object_id!=null) { + if (json_object_id.split("-").length > 1) { + //String composite_rule_type = json_object_id.split("-")[0]; + JSONArray internal_json_slos = (JSONArray) json_object.get(json_object_id); + for (Object o : internal_json_slos) { + JSONObject internal_json_slo = (JSONObject) o; + metric_list.addAll(get_metric_list_from_JSON_slo(internal_json_slo.toJSONString())); + } + } else { + metric_list.add((String) json_object.get("attribute")); + } + } + //If newer format is used + else if (json_object_name!=null){ + JSONArray internal_json_slos = (JSONArray) json_object.get("constraints"); + if ((internal_json_slos!=null) && (internal_json_slos.size()>0)){ + for (Object o : internal_json_slos) { + JSONObject internal_json_slo = (JSONObject) o; + metric_list.addAll(get_metric_list_from_JSON_slo(internal_json_slo.toJSONString())); + } + }else{ + metric_list.add((String) json_object.get("metric")); + } + }else{ + Logger.getAnonymousLogger().log(Level.INFO,"An SLO rule was sent in a format which could not be fully parsed, therefore ignoring this rule. The non-understandable part of the SLO rule is printed below"+"\n"+json_object_string); + } + }catch (Exception p){ + p.printStackTrace(); + return new ArrayList(); + } + return new ArrayList(metric_list); + } + + private static double get_metric_value_from_JSON(String data_arrived) { + double result = -1; + + JSONObject json_data_representation = render_valid_json(data_arrived,"="); + //JSONObject json_data_representation = (JSONObject) new JSONParser().parse(data_arrived); + result = Double.parseDouble((String)json_data_representation.get("metricValue")); + + return result; + } + + /** + * This method replaces any invalid characters in the json which is received from the broker, and creates a valid JSON object + * @param data_arrived The data which is received from the broker + * @param string_to_replace The invalid character which should be substituted by an 'equals' sign + * @return A JSON object + */ + private static JSONObject render_valid_json(String data_arrived, String string_to_replace) { + String valid_json_string = new String(data_arrived); + JSONObject json_object = new JSONObject(); + valid_json_string = valid_json_string.replaceAll(string_to_replace,":"); + + valid_json_string = valid_json_string.replaceAll("[{}]",""); + + String [] json_elements = valid_json_string.split(","); + List json_elements_list = Arrays.stream(json_elements).map(String::trim).collect(Collectors.toList()); + + for (String element: json_elements_list) { + json_object.put(element.split(":")[0],element.split(":")[1]); + } + + return json_object; + } + + + private static void initialize_global_prediction_attribute_data(){ + Logger.getAnonymousLogger().log(warning_logging_level,"Get global prediction attribute data needs implementation"); + } + + + private static PredictionAttributeSet get_prediction_attribute_set(ArrayList rules){ + //usedglobalHashmap: attribute_data, + return null; + } + + private static PredictionAttributeSet initialize_with_existing_values(Double cpu_value, Double ram_value) { + ArrayList metric_names = new ArrayList<>(){{ + add("cpu"); + add("ram"); + add("hard_disk"); + }}; + RealtimeMonitoringAttribute.initialize_monitoring_attribute_rates_of_change(metric_names); + RealtimeMonitoringAttribute.update_monitoring_attribute_value("cpu",cpu_value); + RealtimeMonitoringAttribute.update_monitoring_attribute_value("ram",ram_value); + + PredictedMonitoringAttribute cpuPredictionAttribute = new PredictedMonitoringAttribute("cpu", 70,1, 90.0,80,10,System.currentTimeMillis(),System.currentTimeMillis()+10000); + PredictedMonitoringAttribute ramPredictionAttribute = new PredictedMonitoringAttribute("ram", 50,2, 70.0,80,10,System.currentTimeMillis(),System.currentTimeMillis()+10000); + + + PredictionAttributeSet predictionAttributeSet = new PredictionAttributeSet(new ArrayList<>(){{add(cpuPredictionAttribute);add(ramPredictionAttribute);}}); + + return predictionAttributeSet; + } + private static PredictionAttributeSet pseudo_initialize() throws Exception { + ArrayList metric_names = new ArrayList<>(){{ + add("cpu"); + add("ram"); + add("hard_disk"); + }}; + RealtimeMonitoringAttribute.initialize_monitoring_attribute_rates_of_change(metric_names); + + //initial cpu values + ArrayList cpu_values = new ArrayList<>(); + cpu_values.add(10.0); + cpu_values.add(20.0); + cpu_values.add(30.0); + cpu_values.add(40.0); + cpu_values.add(50.0); + cpu_values.add(40.0); + cpu_values.add(50.0); + cpu_values.add(50.0); + cpu_values.add(50.0); + cpu_values.add(50.0); + cpu_values.add(50.0); + cpu_values.add(50.0); + cpu_values.add(50.0); + cpu_values.add(50.0); + cpu_values.add(50.0); + + MonitoringAttributeUtilities.initialize_values("cpu",MathUtils.get_average(cpu_values)); + + //initial ram values + ArrayList ram_values = new ArrayList<>(); + ram_values.add(20.0); + ram_values.add(20.0); + ram_values.add(25.0); + ram_values.add(45.0); + ram_values.add(30.0); + ram_values.add(30.0); + ram_values.add(30.0); + ram_values.add(30.0); + ram_values.add(30.0); + ram_values.add(30.0); + ram_values.add(40.0); + ram_values.add(30.0); + ram_values.add(30.0); + ram_values.add(30.0); + ram_values.add(30.0); + ram_values.add(30.0); + ram_values.add(30.0); + ram_values.add(30.0); + ram_values.add(30.0); + ram_values.add(30.0); + MonitoringAttributeUtilities.initialize_values("ram",MathUtils.get_average(ram_values)); + + //Get prediction_attribute_sets and calculate method 1 on top of them + //Get individual prediction_attributes and calculate method 2 on top of them + + PredictedMonitoringAttribute cpuPredictionAttribute = new PredictedMonitoringAttribute("cpu", 70,1, 90.0,80,10,System.currentTimeMillis(),System.currentTimeMillis()+10000); + PredictedMonitoringAttribute ramPredictionAttribute = new PredictedMonitoringAttribute("ram", 50,2, 70.0,80,10,System.currentTimeMillis(),System.currentTimeMillis()+10000); + + + PredictionAttributeSet predictionAttributeSet = new PredictionAttributeSet(new ArrayList<>(){{add(cpuPredictionAttribute);add(ramPredictionAttribute);}}); + + return predictionAttributeSet; + } + + private static ArrayList initialize_attribute_subscribers(ArrayList rules_list, String broker_ip_address, String broker_username, String broker_password){ + ArrayList attribute_subscribers = new ArrayList<>(); + for (SLORule rule:rules_list){ + attribute_subscribers.add(new AttributeSubscription(rule,broker_ip_address,broker_username,broker_password)); + } + return attribute_subscribers; + } + + public static void initialize_slo_processing(ArrayList rules_list){ + + for (SLORule rule:rules_list) { + + Thread severity_calculation_thread = new Thread(() -> { + BrokerPublisher persistent_publisher = new BrokerPublisher(topic_for_severity_announcement, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password"), amq_library_configuration_location); + + while (!stop_signal.get()) { + /*try { + Thread.sleep(time_horizon_seconds*1000); + } catch (InterruptedException e) { + e.printStackTrace(); + }*/ + synchronized (PREDICTION_EXISTS) { + while (!PREDICTION_EXISTS.getValue()) { + try { + PREDICTION_EXISTS.wait(); + } catch (InterruptedException e) { + synchronized (stop_signal) { + if (stop_signal.get()) { + running_threads.remove("severity_calculation_thread_" + rule.toString()); + PREDICTION_EXISTS.setValue(false); + return; + } + } + e.printStackTrace(); + } + } + + } + try { + Clock clock = Clock.systemUTC(); + Long current_time = clock.millis(); + Long targeted_prediction_time; + synchronized (ADAPTATION_TIMES_MODIFY) { + while (!ADAPTATION_TIMES_MODIFY.getValue()) { + try { + ADAPTATION_TIMES_MODIFY.wait(); + } catch (InterruptedException e) { + Logger.getAnonymousLogger().log(warning_logging_level, "Interrupted while waiting to access the lock for adaptation times object"); + e.printStackTrace(); + } + } + ADAPTATION_TIMES_MODIFY.setValue(false); + clean_data(adaptation_times_to_remove); + //targeted_prediction_time = adaptation_times.stream().min(Long::compare).get(); + targeted_prediction_time = get_next_targeted_prediction_time(); + ADAPTATION_TIMES_MODIFY.setValue(true); + ADAPTATION_TIMES_MODIFY.notifyAll(); + } + if (targeted_prediction_time==null){ + continue; + } + Logger.getAnonymousLogger().log(info_logging_level, "Targeted_prediction_time " + targeted_prediction_time); + Thread internal_severity_calculation_thread = new Thread(() -> { + try { + /* + synchronized (ADAPTATION_TIMES_MODIFY) { + while (!ADAPTATION_TIMES_MODIFY.getValue()) { + ADAPTATION_TIMES_MODIFY.wait(); + } + ADAPTATION_TIMES_MODIFY.setValue(false); + adaptation_times.remove(targeted_prediction_time);//remove from the list of timepoints which should be processed. Later this timepoint will be added to the adaptation_times_to_remove HashSet to remove any data associated with it + ADAPTATION_TIMES_MODIFY.setValue(true); + ADAPTATION_TIMES_MODIFY.notifyAll(); + } + //adaptation_times_pending_processing.add(targeted_prediction_time); + + */ + synchronized (PREDICTION_EXISTS) { + PREDICTION_EXISTS.setValue(adaptation_times.size() > 0); + } + + Long sleep_time = targeted_prediction_time * 1000 - time_horizon_seconds * 1000L - current_time; + if (sleep_time <= 0) { + Logger.getAnonymousLogger().log(info_logging_level, "Prediction cancelled as targeted prediction time was " + targeted_prediction_time * 1000 + " current time is " + current_time + " and the time_horizon is " + time_horizon_seconds * 1000); + return; //The predictions are too near to the targeted reconfiguration time (or are even obsolete) + } else if (sleep_time > current_time + maximum_acceptable_forward_predictions * time_horizon_seconds * 1000L) { + Logger.getAnonymousLogger().log(info_logging_level, "Prediction cancelled as targeted prediction time was " + targeted_prediction_time * 1000 + " and the current time is " + current_time + ". The prediction is more than " + maximum_acceptable_forward_predictions + " time_horizon intervals into the future (the time_horizon is " + time_horizon_seconds * 1000 + " milliseconds)"); + return; //The predictions are too near to the targeted reconfiguration tim + } + Logger.getAnonymousLogger().log(info_logging_level, "Sleeping for " + sleep_time + " milliseconds"); + sleep(sleep_time); + double rule_severity = process_rule_value(rule.getRule_representation(), targeted_prediction_time, rule.getRule_format()); + double slo_violation_probability = determine_slo_violation_probability(rule_severity); + Logger.getAnonymousLogger().log(info_logging_level, "The overall " + slo_violation_determination_method + " severity - calculated from real data - for adaptation time " + targeted_prediction_time + " ( " + (new Date((new Timestamp(targeted_prediction_time * 1000)).getTime())) + " ) is " + rule_severity + " and is calculated " + time_horizon_seconds + " seconds beforehand"); + Logger.getAnonymousLogger().log(info_logging_level, "The probability of an SLO violation is " + ((int) (slo_violation_probability * 100)) + "%" + (slo_violation_probability< slo_violation_probability_threshold ?" so it will not be published":" and it will be published")); + + if (slo_violation_probability>= slo_violation_probability_threshold) { + JSONObject severity_json = new JSONObject(); + severity_json.put("severity", rule_severity); + severity_json.put("probability", slo_violation_probability); + severity_json.put("predictionTime", targeted_prediction_time); + persistent_publisher.publish(severity_json.toJSONString()); + } + + slo_violation_event_recording_queue.add(System.currentTimeMillis()); + + //Probably not necessary to synchronize the line below as each removal will happen only once in a reconfiguration interval, and reconfiguration intervals are assumed to have a duration of minutes. + //Necessary to synchronize because another severity calculation thread might invoke clean_data above, and then a concurrent modification exception may arise + synchronized (ADAPTATION_TIMES_MODIFY){ + while (!ADAPTATION_TIMES_MODIFY.getValue()){ + ADAPTATION_TIMES_MODIFY.wait(); + } + ADAPTATION_TIMES_MODIFY.setValue(false); + adaptation_times_to_remove.add(targeted_prediction_time); //This line serves a different purpose from the adaptation_times.remove(...) directive above, as the adaptation_times_to_remove HashSet contains timepoints which should be processed to delete their data. + adaptation_times_pending_processing.remove(targeted_prediction_time); + ADAPTATION_TIMES_MODIFY.setValue(true); + ADAPTATION_TIMES_MODIFY.notifyAll(); + } + } catch (InterruptedException i) { + Logger.getAnonymousLogger().log(severe_logging_level, "Severity calculation thread for epoch time " + targeted_prediction_time + " interrupted, stopping..."); + return; + } + }); + internal_severity_calculation_thread.setName("internal_severity_calculation_thread_" + targeted_prediction_time); + internal_severity_calculation_thread.start(); + } catch (NoSuchElementException n) { + Logger.getAnonymousLogger().log(warning_logging_level, "Could not calculate severity as a value was missing..."); + continue; + } + } + running_threads.remove("severity_calculation_thread_"+rule.toString()); + }); + String severity_calculation_thread_name = "severity_calculation_thread_"+rule.toString(); + severity_calculation_thread.setName(severity_calculation_thread_name); + severity_calculation_thread.start(); + running_threads.put(severity_calculation_thread_name,severity_calculation_thread); + }/* + while (true){ + + }*/ + + //return slo_processors; + } + + private static Long get_next_targeted_prediction_time() { + List possible_targeted_prediction_times = adaptation_times.stream().sorted().limit(maximum_acceptable_forward_predictions).collect(Collectors.toList()); + for (int i=0; i adaptation_times_to_remove) { + for (Long processed_adaptation_time:adaptation_times_to_remove){ + if (processed_adaptation_time>last_processed_adaptation_time){ + last_processed_adaptation_time = processed_adaptation_time; + } + synchronized (can_modify_slo_rules) { + while (!can_modify_slo_rules.getValue()) { + try { + can_modify_slo_rules.wait(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + can_modify_slo_rules.setValue(false); + for (SLORule slo_rule : slo_rules) { + for (SLOSubRule subrule : slo_rule.getSlo_subrules()) { + if (getPredicted_monitoring_attributes().containsKey(subrule.getId())) { + getPredicted_monitoring_attributes().get(subrule.getId()).remove(processed_adaptation_time); + } + } + } + can_modify_slo_rules.setValue(true); + can_modify_slo_rules.notifyAll(); + } + } + } + + /** + * This function determines the probability of an SLO violation + * @param rule_severity The severity of the rule which has been determined + * @return The probability of the rule being violated. The minimum value of this probability is 0, and increases as the severity increases + */ + public static double determine_slo_violation_probability(double rule_severity) { + if (slo_violation_determination_method.equals("all-metrics")) { + //39.64 is the mean severity value when examining all integer severity values for roc x probability x confidence_interval x delta_value in (-100,100)x(0,100)x(0,100)x(-100,100) + /* + if (rule_severity >= 40) { + return Math.min((50 + 50*(rule_severity - 40) / 60)/100,1); // in case we desire the probability to start from 50% + // return Math.min((100*(rule_severity - 40) / 60)/100,1); // in case we desire the probability to start from 0% + } else { + return 0; + } + + */ + return Math.min(rule_severity/100,100); + }else if (slo_violation_determination_method.equals("prconf-delta")){ + //Logger.getAnonymousLogger().log(warning_logging_level,"The calculation of probability for the prconf-delta method needs to be implemented"); + //return 0; + if (rule_severity >= 6.52){ + return Math.min((50+50*(rule_severity-6.52)/93.48)/100,1); + }else{ + return 0; + } + + }else{ + Logger.getAnonymousLogger().log(warning_logging_level,"Unknown severity calculation method"); + return 0; + } + } + + public static AtomicBoolean getStop_signal() { + return stop_signal; + } + +} diff --git a/slo-violation-detector/src/main/java/runtime/Main.java b/slo-violation-detector/src/main/java/runtime/Main.java index 150d805..fb57e3b 100644 --- a/slo-violation-detector/src/main/java/runtime/Main.java +++ b/slo-violation-detector/src/main/java/runtime/Main.java @@ -10,59 +10,42 @@ package runtime; import eu.melodic.event.brokerclient.BrokerPublisher; import eu.melodic.event.brokerclient.BrokerSubscriber; -import metric_retrieval.AttributeSubscription; -import org.apache.commons.collections4.queue.CircularFifoQueue; -import org.json.simple.JSONArray; import org.json.simple.JSONObject; -import org.json.simple.parser.JSONParser; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.web.servlet.support.SpringBootServletInitializer; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; import slo_processing.SLORule; -import slo_processing.SLOSubRule; import utilities.DebugDataSubscription; -import utilities.MathUtils; import utility_beans.*; import java.io.File; -import java.io.FileInputStream; +import java.io.IOException; import java.io.InputStream; -import java.net.URI; import java.nio.file.Files; import java.nio.file.Paths; -import java.sql.Timestamp; import java.time.Clock; import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiFunction; -import java.util.logging.Level; import java.util.logging.Logger; -import java.util.stream.Collectors; import static configuration.Constants.*; -import static java.lang.Thread.sleep; -import static slo_processing.SLORule.process_rule_value; -import static utility_beans.PredictedMonitoringAttribute.getPredicted_monitoring_attributes; +import static processing_logic.Runnables.slo_detector_mode_runnable; +import static utilities.OperationalModeUtils.getSLOViolationDetectionOperationalMode; +import static utilities.SLOViolationDetectorUtils.*; +import static utilities.SLOViolationDetectorStateUtils.*; +import static utility_beans.CharacterizedThread.CharacterizedThreadRunMode.attached; +import static utility_beans.CharacterizedThread.CharacterizedThreadRunMode.detached; +import static utility_beans.CharacterizedThread.CharacterizedThreadType.persistent_running_thread; import static utility_beans.RealtimeMonitoringAttribute.getMonitoring_attributes_bounds_representation; +@RestController +@SpringBootApplication public class Main { - public static final AtomicBoolean stop_signal = new AtomicBoolean(false); - public static final SynchronizedBoolean PREDICTION_EXISTS = new SynchronizedBoolean(false); - public static final SynchronizedBoolean ADAPTATION_TIMES_MODIFY = new SynchronizedBoolean(true); - public static SynchronizedBooleanMap HAS_MESSAGE_ARRIVED = new SynchronizedBooleanMap(); - public static SynchronizedStringMap MESSAGE_CONTENTS = new SynchronizedStringMap(); - public static ArrayList slo_rules = new ArrayList<>(); - public static HashMap running_threads = new HashMap<>(); - public static HashSet adaptation_times = new HashSet<>(); - public static HashSet adaptation_times_pending_processing = new HashSet<>(); - private static HashSet adaptation_times_to_remove = new HashSet<>(); - public static Long last_processed_adaptation_time = -1L;//initialization public static Long current_slo_rules_version = -1L;//initialization - public static final AtomicBoolean slo_rule_arrived = new AtomicBoolean(false); - public static final SynchronizedBoolean can_modify_slo_rules = new SynchronizedBoolean(false); - - //Debugging variables - public static CircularFifoQueue slo_violation_event_recording_queue = new CircularFifoQueue<>(50); - public static CircularFifoQueue severity_calculation_event_recording_queue = new CircularFifoQueue<>(50); - private static Properties prop = new Properties(); - + public static boolean EXECUTION_NOT_TERMINATED = true; public static void main(String[] args) { //The input of this program is the type of the SLO violations which are monitored, and the predicted metric values which are evaluated. Its output are events which produce an estimate of the probability of an adaptation. @@ -74,645 +57,190 @@ public class Main { // - The prediction confidence try { - InputStream inputStream = null; - if (args.length == 0) { - base_project_path = new File("").toURI(); - URI absolute_configuration_file_path = new File(configuration_file_location).toURI(); - URI relative_configuration_file_path = base_project_path.relativize(absolute_configuration_file_path); - Logger.getAnonymousLogger().log(info_logging_level,"This is the base project path:"+base_project_path); - inputStream = new FileInputStream(base_project_path.getPath()+relative_configuration_file_path); - } else { - Logger.getAnonymousLogger().log(info_logging_level, "A preferences file has been manually specified"); - - if (base_project_path == null || base_project_path.getPath().equals(EMPTY)) { - base_project_path = new File(args[0]).toURI(); - } - inputStream = new FileInputStream(base_project_path.getPath()); - } - prop.load(inputStream); - String slo_rules_topic = prop.getProperty("slo_rules_topic"); - kept_values_per_metric = Integer.parseInt(prop.getProperty("stored_values_per_metric","5")); - self_publish_rule_file = Boolean.parseBoolean(prop.getProperty("self_publish_rule_file")); - single_slo_rule_active = Boolean.parseBoolean(prop.getProperty("single_slo_rule_active")); - time_horizon_seconds = Integer.parseInt(prop.getProperty("time_horizon_seconds")); - - slo_violation_probability_threshold = Double.parseDouble(prop.getProperty("slo_violation_probability_threshold")); - slo_violation_determination_method = prop.getProperty("slo_violation_determination_method"); - maximum_acceptable_forward_predictions = Integer.parseInt(prop.getProperty("maximum_acceptable_forward_predictions")); - ArrayList unbounded_metric_strings = new ArrayList(Arrays.asList(prop.getProperty("metrics_bounds").split(","))); - for (String metric_string : unbounded_metric_strings){ - getMonitoring_attributes_bounds_representation().put(metric_string.split(";")[0], metric_string.split(";",2)[1]); - } - - while (true) { - - if (first_run && USE_CONFIGURATION_FILE_FOR_METRIC_VALUES_INPUT) { - - String json_file_name = prop.getProperty("input_file"); - slo_violation_determination_method = prop.getProperty("slo_violation_determination_method"); - confidence_interval = Double.parseDouble(prop.getProperty("confidence_interval")); - prediction_certainty = Double.parseDouble(prop.getProperty("prediction_certainty")); - - ArrayList metric_names = new ArrayList<>() {{ - add("cpu"); - add("ram"); - add("bandwidth"); - add("disk"); - }}; - HashMap input_data = new HashMap<>(); - for (String metric_name : metric_names) { - - Double metric_input_data = MathUtils.get_average(new ArrayList<>(Arrays.stream(prop.getProperty(metric_name).split(",")).map(Double::parseDouble).collect(Collectors.toList()))); - - input_data.put(metric_name, metric_input_data); - } - - - RealtimeMonitoringAttribute.initialize_monitoring_attribute_rates_of_change(metric_names); - RealtimeMonitoringAttribute.simple_initialize_0_100_bounded_attributes(metric_names); - - RealtimeMonitoringAttribute.update_monitoring_attributes_values_map(input_data); - - //Parsing of file - String rules_json_string = String.join(EMPTY, Files.readAllLines(Paths.get(new File(json_file_name).getAbsolutePath()))); - Logger.getAnonymousLogger().log(info_logging_level, rules_json_string); - Main.MESSAGE_CONTENTS.assign_value(slo_rules_topic, rules_json_string); - slo_rules.add(new SLORule(Main.MESSAGE_CONTENTS.get_synchronized_contents(slo_rules_topic), new ArrayList<>(Arrays.asList(prop.getProperty("metrics_list").split(","))))); - + { + InputStream inputStream = null; + if (args.length == 0) { + operational_mode = getSLOViolationDetectionOperationalMode("DIRECTOR"); + inputStream = getPreferencesFileInputStream(EMPTY); + } else if (args.length == 1) { + Logger.getAnonymousLogger().log(info_logging_level, "Operational mode has been manually specified"); + operational_mode = getSLOViolationDetectionOperationalMode(args[0]); + inputStream = getPreferencesFileInputStream(EMPTY); } else { - if (first_run && USE_CONFIGURATION_FILE_FOR_METRIC_TOPICS_INPUT) { - synchronized (can_modify_slo_rules) { - //do { - // can_modify_slo_rules.wait(); - //}while(!can_modify_slo_rules.getValue()); - can_modify_slo_rules.setValue(false); - - slo_rules.add(new SLORule(Main.MESSAGE_CONTENTS.get_synchronized_contents(slo_rules_topic), new ArrayList<>(Arrays.asList(prop.getProperty("metrics_list").split(","))))); - can_modify_slo_rules.setValue(true); - slo_rule_arrived.set(true); - can_modify_slo_rules.notifyAll(); - } - } else if (first_run){ - - BiFunction function = (topic, message) -> { - synchronized (can_modify_slo_rules) { - can_modify_slo_rules.setValue(true); - Main.MESSAGE_CONTENTS.assign_value(topic, message); - slo_rule_arrived.set(true); - can_modify_slo_rules.notifyAll(); - - Logger.getAnonymousLogger().log(info_logging_level, "BrokerClientApp: - Received text message: " + message + " at topic " + topic); - - } - return topic + ":MSG:" + message; - }; - - BrokerSubscriber subscriber = new BrokerSubscriber(slo_rules_topic, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password"), amq_library_configuration_location); - new Thread(() -> { - while (true) { - subscriber.subscribe(function, new AtomicBoolean(false)); //This subscriber should be immune to stop signals - Logger.getAnonymousLogger().log(info_logging_level,"Broker unavailable, will try to reconnect after 10 seconds"); - try { - Thread.sleep(10000); - }catch (InterruptedException i){ - Logger.getAnonymousLogger().log(info_logging_level,"Sleep was interrupted, will immediately try to connect to the broker"); - } - } - }).start(); - - if (self_publish_rule_file) { - String json_file_name = prop.getProperty("input_file"); - String rules_json_string = String.join(EMPTY, Files.readAllLines(Paths.get(new File(json_file_name).getAbsolutePath()))); - BrokerPublisher publisher = new BrokerPublisher(slo_rules_topic, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password"), amq_library_configuration_location); - publisher.publish(rules_json_string); - Logger.getAnonymousLogger().log(info_logging_level, "Sent message\n" + rules_json_string); - } - } - } - first_run = false; - synchronized (can_modify_slo_rules) { - do { - try { - can_modify_slo_rules.wait(); - }catch (InterruptedException i){ - i.printStackTrace(); - } - }while((!can_modify_slo_rules.getValue()) || (!slo_rule_arrived.get())); - can_modify_slo_rules.setValue(false); - slo_rule_arrived.set(false); - String rule_representation = MESSAGE_CONTENTS.get_synchronized_contents(slo_rules_topic); - if (slo_rule_arrived_has_updated_version(rule_representation)) { - if (single_slo_rule_active) { - slo_rules.clear(); - } - - ArrayList additional_metrics_from_new_slo = get_metric_list_from_JSON_slo(rule_representation); - - if (additional_metrics_from_new_slo.size() > 0) { - slo_rules.add(new SLORule(rule_representation, additional_metrics_from_new_slo)); - } - can_modify_slo_rules.setValue(true); - can_modify_slo_rules.notifyAll(); - }else{ - can_modify_slo_rules.setValue(true); - can_modify_slo_rules.notifyAll(); - continue; - } - - + Logger.getAnonymousLogger().log(info_logging_level, "Operational mode and preferences file has been manually specified"); + operational_mode = getSLOViolationDetectionOperationalMode(args[0]); + inputStream = getPreferencesFileInputStream(args[1]); } + prop.load(inputStream); + slo_rules_topic = prop.getProperty("slo_rules_topic"); + kept_values_per_metric = Integer.parseInt(prop.getProperty("stored_values_per_metric", "5")); + self_publish_rule_file = Boolean.parseBoolean(prop.getProperty("self_publish_rule_file")); + single_slo_rule_active = Boolean.parseBoolean(prop.getProperty("single_slo_rule_active")); + time_horizon_seconds = Integer.parseInt(prop.getProperty("time_horizon_seconds")); - stop_all_running_threads(); - DebugDataSubscription.initiate(prop.getProperty("broker_ip_url"),prop.getProperty("broker_username"), prop.getProperty("broker_password")); - initialize_monitoring_datastructures_with_empty_data(slo_rules); - // - initialize_subrule_and_attribute_associations(slo_rules); - initialize_attribute_subscribers(slo_rules, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password")); - initialize_slo_processing(slo_rules); - - //while (true) { - - //} + slo_violation_probability_threshold = Double.parseDouble(prop.getProperty("slo_violation_probability_threshold")); + slo_violation_determination_method = prop.getProperty("slo_violation_determination_method"); + maximum_acceptable_forward_predictions = Integer.parseInt(prop.getProperty("maximum_acceptable_forward_predictions")); + ArrayList unbounded_metric_strings = new ArrayList(Arrays.asList(prop.getProperty("metrics_bounds").split(","))); + for (String metric_string : unbounded_metric_strings) { + getMonitoring_attributes_bounds_representation().put(metric_string.split(";")[0], metric_string.split(";", 2)[1]); + } + } //initialization + if (operational_mode.equals(OperationalMode.DETECTOR)) { + start_new_slo_violation_detector_subcomponent(attached); + }else if (operational_mode.equals(OperationalMode.DIRECTOR)){ + /* + while (EXECUTION_NOT_TERMINATED) { + synchronized (create_new_slo_detector) { + create_new_slo_detector.wait(); + for (int i=1; i<=create_new_slo_detector.getValue(); i++) { + CharacterizedThread.create_new_thread(slo_detector_mode_runnable, "slo_detector_master_thread", persistent_running_thread, true); + } + create_new_slo_detector.setValue(0); + } + } + */ + SpringApplication.run(Main.class, args); + System.out.println("TEST"); + //TODO start a new DETECTOR instance when a relevant message appears } - }catch (Exception e){ + }catch (IOException e){ Logger.getAnonymousLogger().log(info_logging_level,"Problem reading input file"); e.printStackTrace(); - } - } - - private static boolean slo_rule_arrived_has_updated_version(String rule_representation) { - JSONObject json_object = null; - long json_object_version = Integer.MAX_VALUE; - try { - json_object = (JSONObject) new JSONParser().parse(rule_representation); - json_object_version = (Long) json_object.get("version"); - } catch (NullPointerException n){ - n.printStackTrace(); - Logger.getAnonymousLogger().log(info_logging_level,"Unfortunately a null message was sent to the SLO Violation Detector, which is being ignored"); - return false; - } catch (Exception e){ + }catch (Exception e){ + Logger.getAnonymousLogger().log(info_logging_level,"Miscellaneous issue during startup"); e.printStackTrace(); - Logger.getAnonymousLogger().log(info_logging_level,"Could not parse the JSON of the new SLO, assuming it is not an updated rule..."); - return false; - } - if (json_object_version > current_slo_rules_version){ - Logger.getAnonymousLogger().log(info_logging_level,"An SLO with updated version ("+json_object_version+" vs older "+current_slo_rules_version+") has arrived"); - current_slo_rules_version=json_object_version; - return true; - }else { - Logger.getAnonymousLogger().log(info_logging_level,"Taking no action for the received SLO message as the version number is not updated"); - return false; } } - - private static void stop_all_running_threads() { - Logger.getAnonymousLogger().log(info_logging_level,"Asking previously existing threads to terminate"); - int initial_number_of_running_threads = running_threads.size(); - while (running_threads.size()>0) { - synchronized (stop_signal) { - stop_signal.set(true); - stop_signal.notifyAll(); - } - try { - Thread.sleep(3000); - running_threads.values().forEach(Thread::interrupt); - }catch (Exception e){ - } - Logger.getAnonymousLogger().log(info_logging_level,"Stopped "+(initial_number_of_running_threads-running_threads.size())+"/"+initial_number_of_running_threads+" already running threads"); - if (running_threads.size()>1){ - Logger.getAnonymousLogger().log(info_logging_level,"The threads which are still running are the following: "+running_threads); - }else if (running_threads.size()>0){ - Logger.getAnonymousLogger().log(info_logging_level,"The thread which is still running is the following: "+running_threads); - } - } - Logger.getAnonymousLogger().log(info_logging_level,"All threads have terminated"); - synchronized (stop_signal) { - stop_signal.set(false); - } - synchronized (PREDICTION_EXISTS){ - PREDICTION_EXISTS.setValue(false); - } - adaptation_times.clear(); + @RequestMapping("/add-new-detector") + public static String start_new_slo_violation_detector_subcomponent() throws IOException { + start_new_slo_violation_detector_subcomponent(detached); + return ("Spawned new SLO Detector subcomponent instance!"); } - - public static void initialize_subrule_and_attribute_associations(ArrayList slo_rules) { - synchronized (can_modify_slo_rules) { - while (!can_modify_slo_rules.getValue()){ - try { - can_modify_slo_rules.wait(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - can_modify_slo_rules.setValue(false); - for (SLORule slo_rule : slo_rules) { - for (SLOSubRule subrule : SLORule.parse_subrules(slo_rule.getRule_representation(),slo_rule.getRule_format())) { - SLOSubRule.getSlo_subrules_per_monitoring_attribute().computeIfAbsent(subrule.getMetric(), k -> new ArrayList<>()); - SLOSubRule.getSlo_subrules_per_monitoring_attribute().get(subrule.getMetric()).add(subrule); - } - } - can_modify_slo_rules.setValue(true); - can_modify_slo_rules.notifyAll(); + public static void start_new_slo_violation_detector_subcomponent(CharacterizedThread.CharacterizedThreadRunMode characterized_thread_run_mode) throws IOException { + if (characterized_thread_run_mode.equals(attached)) { + run_slo_violation_detection_engine(); + }else/*detached mode*/{ + CharacterizedThread.create_new_thread(slo_detector_mode_runnable, "slo_detector_master_thread", persistent_running_thread, true); } } + public static void run_slo_violation_detection_engine() throws IOException { + while (true) { + if (first_run){ + //Creation of threads that should always run and are independent of the monitored application. + //1. Creation of the slo rule input subscriber thread, which listens for new slo rules to be considered + //2. Creation of the lost device subscriber thread, which listens for a new event signalling a lost edge device + BiFunction slo_rule_topic_subscriber_function = (topic, message) -> { + synchronized (can_modify_slo_rules) { + can_modify_slo_rules.setValue(true); + MESSAGE_CONTENTS.assign_value(topic, message); + slo_rule_arrived.set(true); + can_modify_slo_rules.notifyAll(); - private static void initialize_monitoring_datastructures_with_empty_data(ArrayList slo_rules){ - for(SLORule slo_rule: slo_rules){ - for (String metric_name : slo_rule.get_monitoring_attributes()) { - MonitoringAttributeUtilities.initialize_values(metric_name); - } - } - } + Logger.getAnonymousLogger().log(info_logging_level, "BrokerClientApp: - Received text message: " + message + " at topic " + topic); - public static ArrayList get_metric_list_from_JSON_slo(String json_object_string) { - HashSet metric_list = new HashSet<>(); - try { - JSONObject json_object = (JSONObject) new JSONParser().parse(json_object_string); - String json_object_id = (String) json_object.get("id"); - String json_object_name = (String) json_object.get("name"); - //Older format uses id-based fields, newer format uses a non-variable structure - //We first check if an event using the older format is sent, and then check if the event is sent using the newer format - if (json_object_id!=null) { - if (json_object_id.split("-").length > 1) { - //String composite_rule_type = json_object_id.split("-")[0]; - JSONArray internal_json_slos = (JSONArray) json_object.get(json_object_id); - for (Object o : internal_json_slos) { - JSONObject internal_json_slo = (JSONObject) o; - metric_list.addAll(get_metric_list_from_JSON_slo(internal_json_slo.toJSONString())); } - } else { - metric_list.add((String) json_object.get("attribute")); - } - } - //If newer format is used - else if (json_object_name!=null){ - JSONArray internal_json_slos = (JSONArray) json_object.get("constraints"); - if ((internal_json_slos!=null) && (internal_json_slos.size()>0)){ - for (Object o : internal_json_slos) { - JSONObject internal_json_slo = (JSONObject) o; - metric_list.addAll(get_metric_list_from_JSON_slo(internal_json_slo.toJSONString())); - } - }else{ - metric_list.add((String) json_object.get("metric")); - } - }else{ - Logger.getAnonymousLogger().log(Level.INFO,"An SLO rule was sent in a format which could not be fully parsed, therefore ignoring this rule. The non-understandable part of the SLO rule is printed below"+"\n"+json_object_string); - } - }catch (Exception p){ - p.printStackTrace(); - return new ArrayList(); - } - return new ArrayList(metric_list); - } + return topic + ":MSG:" + message; + }; - private static double get_metric_value_from_JSON(String data_arrived) { - double result = -1; - - JSONObject json_data_representation = render_valid_json(data_arrived,"="); - //JSONObject json_data_representation = (JSONObject) new JSONParser().parse(data_arrived); - result = Double.parseDouble((String)json_data_representation.get("metricValue")); - - return result; - } - - /** - * This method replaces any invalid characters in the json which is received from the broker, and creates a valid JSON object - * @param data_arrived The data which is received from the broker - * @param string_to_replace The invalid character which should be substituted by an 'equals' sign - * @return A JSON object - */ - private static JSONObject render_valid_json(String data_arrived, String string_to_replace) { - String valid_json_string = new String(data_arrived); - JSONObject json_object = new JSONObject(); - valid_json_string = valid_json_string.replaceAll(string_to_replace,":"); - - valid_json_string = valid_json_string.replaceAll("[{}]",""); - - String [] json_elements = valid_json_string.split(","); - List json_elements_list = Arrays.stream(json_elements).map(String::trim).collect(Collectors.toList()); - - for (String element: json_elements_list) { - json_object.put(element.split(":")[0],element.split(":")[1]); - } - - return json_object; - } - - - private static void initialize_global_prediction_attribute_data(){ - Logger.getAnonymousLogger().log(warning_logging_level,"Get global prediction attribute data needs implementation"); - } - - - private static PredictionAttributeSet get_prediction_attribute_set(ArrayList rules){ - //usedglobalHashmap: attribute_data, - return null; - } - - private static PredictionAttributeSet initialize_with_existing_values(Double cpu_value, Double ram_value) { - ArrayList metric_names = new ArrayList<>(){{ - add("cpu"); - add("ram"); - add("hard_disk"); - }}; - RealtimeMonitoringAttribute.initialize_monitoring_attribute_rates_of_change(metric_names); - RealtimeMonitoringAttribute.update_monitoring_attribute_value("cpu",cpu_value); - RealtimeMonitoringAttribute.update_monitoring_attribute_value("ram",ram_value); - - PredictedMonitoringAttribute cpuPredictionAttribute = new PredictedMonitoringAttribute("cpu", 70,1, 90.0,80,10,System.currentTimeMillis(),System.currentTimeMillis()+10000); - PredictedMonitoringAttribute ramPredictionAttribute = new PredictedMonitoringAttribute("ram", 50,2, 70.0,80,10,System.currentTimeMillis(),System.currentTimeMillis()+10000); - - - PredictionAttributeSet predictionAttributeSet = new PredictionAttributeSet(new ArrayList<>(){{add(cpuPredictionAttribute);add(ramPredictionAttribute);}}); - - return predictionAttributeSet; - } - private static PredictionAttributeSet pseudo_initialize() throws Exception { - ArrayList metric_names = new ArrayList<>(){{ - add("cpu"); - add("ram"); - add("hard_disk"); - }}; - RealtimeMonitoringAttribute.initialize_monitoring_attribute_rates_of_change(metric_names); - - //initial cpu values - ArrayList cpu_values = new ArrayList<>(); - cpu_values.add(10.0); - cpu_values.add(20.0); - cpu_values.add(30.0); - cpu_values.add(40.0); - cpu_values.add(50.0); - cpu_values.add(40.0); - cpu_values.add(50.0); - cpu_values.add(50.0); - cpu_values.add(50.0); - cpu_values.add(50.0); - cpu_values.add(50.0); - cpu_values.add(50.0); - cpu_values.add(50.0); - cpu_values.add(50.0); - cpu_values.add(50.0); - - MonitoringAttributeUtilities.initialize_values("cpu",MathUtils.get_average(cpu_values)); - - //initial ram values - ArrayList ram_values = new ArrayList<>(); - ram_values.add(20.0); - ram_values.add(20.0); - ram_values.add(25.0); - ram_values.add(45.0); - ram_values.add(30.0); - ram_values.add(30.0); - ram_values.add(30.0); - ram_values.add(30.0); - ram_values.add(30.0); - ram_values.add(30.0); - ram_values.add(40.0); - ram_values.add(30.0); - ram_values.add(30.0); - ram_values.add(30.0); - ram_values.add(30.0); - ram_values.add(30.0); - ram_values.add(30.0); - ram_values.add(30.0); - ram_values.add(30.0); - ram_values.add(30.0); - MonitoringAttributeUtilities.initialize_values("ram",MathUtils.get_average(ram_values)); - - //Get prediction_attribute_sets and calculate method 1 on top of them - //Get individual prediction_attributes and calculate method 2 on top of them - - PredictedMonitoringAttribute cpuPredictionAttribute = new PredictedMonitoringAttribute("cpu", 70,1, 90.0,80,10,System.currentTimeMillis(),System.currentTimeMillis()+10000); - PredictedMonitoringAttribute ramPredictionAttribute = new PredictedMonitoringAttribute("ram", 50,2, 70.0,80,10,System.currentTimeMillis(),System.currentTimeMillis()+10000); - - - PredictionAttributeSet predictionAttributeSet = new PredictionAttributeSet(new ArrayList<>(){{add(cpuPredictionAttribute);add(ramPredictionAttribute);}}); - - return predictionAttributeSet; - } - - private static ArrayList initialize_attribute_subscribers(ArrayList rules_list, String broker_ip_address, String broker_username, String broker_password){ - ArrayList attribute_subscribers = new ArrayList<>(); - for (SLORule rule:rules_list){ - attribute_subscribers.add(new AttributeSubscription(rule,broker_ip_address,broker_username,broker_password)); - } - return attribute_subscribers; - } - - public static void initialize_slo_processing(ArrayList rules_list){ - - for (SLORule rule:rules_list) { - - Thread severity_calculation_thread = new Thread(() -> { - BrokerPublisher persistent_publisher = new BrokerPublisher(topic_for_severity_announcement, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password"), amq_library_configuration_location); - - while (!stop_signal.get()) { - /*try { - Thread.sleep(time_horizon_seconds*1000); - } catch (InterruptedException e) { - e.printStackTrace(); - }*/ - synchronized (PREDICTION_EXISTS) { - while (!PREDICTION_EXISTS.getValue()) { - try { - PREDICTION_EXISTS.wait(); - } catch (InterruptedException e) { - synchronized (stop_signal) { - if (stop_signal.get()) { - running_threads.remove("severity_calculation_thread_" + rule.toString()); - PREDICTION_EXISTS.setValue(false); - return; - } - } - e.printStackTrace(); - } + BrokerSubscriber slo_rule_topic_subscriber = new BrokerSubscriber(slo_rules_topic, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password"), amq_library_configuration_location); + Runnable slo_rules_topic_subscriber_runnable = () -> { + while (true) { + slo_rule_topic_subscriber.subscribe(slo_rule_topic_subscriber_function, new AtomicBoolean(false)); //This subscriber should be immune to stop signals + Logger.getAnonymousLogger().log(info_logging_level,"Broker unavailable, will try to reconnect after 10 seconds"); + try { + Thread.sleep(10000); + }catch (InterruptedException i){ + Logger.getAnonymousLogger().log(info_logging_level,"Sleep was interrupted, will immediately try to connect to the broker"); } - } - try { - Clock clock = Clock.systemUTC(); - Long current_time = clock.millis(); - Long targeted_prediction_time; - synchronized (ADAPTATION_TIMES_MODIFY) { - while (!ADAPTATION_TIMES_MODIFY.getValue()) { - try { - ADAPTATION_TIMES_MODIFY.wait(); - } catch (InterruptedException e) { - Logger.getAnonymousLogger().log(warning_logging_level, "Interrupted while waiting to access the lock for adaptation times object"); - e.printStackTrace(); - } - } - ADAPTATION_TIMES_MODIFY.setValue(false); - clean_data(adaptation_times_to_remove); - //targeted_prediction_time = adaptation_times.stream().min(Long::compare).get(); - targeted_prediction_time = get_next_targeted_prediction_time(); - ADAPTATION_TIMES_MODIFY.setValue(true); - ADAPTATION_TIMES_MODIFY.notifyAll(); + }; + CharacterizedThread.create_new_thread(slo_rules_topic_subscriber_runnable,"slo_rules_topic_subscriber_thread", persistent_running_thread,true); + + + //Implementation of 'Lost edge device' thread + + BiFunction device_lost_subscriber_function = (topic, message) -> { + BrokerPublisher persistent_publisher = new BrokerPublisher(topic_for_severity_announcement, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password"), amq_library_configuration_location); + + Clock clock = Clock.systemUTC(); + Long current_time_seconds = (long) Math.floor(clock.millis()/1000.0); + JSONObject severity_json = new JSONObject(); + severity_json.put("severity", 100); + severity_json.put("probability", 100); + severity_json.put("predictionTime", current_time_seconds); + persistent_publisher.publish(severity_json.toJSONString()); + + //TODO possibly necessary to remove the next adaptation time as it will probably not be possible to start an adaptation during it + return topic + ":MSG:" + message; + }; + + BrokerSubscriber device_lost_subscriber = new BrokerSubscriber(topic_for_lost_device_announcement, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password"), amq_library_configuration_location); + + Runnable device_lost_topic_subscriber_runnable = () -> { + while (true) { + device_lost_subscriber.subscribe(device_lost_subscriber_function, new AtomicBoolean(false)); //This subscriber should be immune to stop signals + Logger.getAnonymousLogger().log(info_logging_level,"Broker unavailable, will try to reconnect after 10 seconds"); + try { + Thread.sleep(10000); + }catch (InterruptedException i){ + Logger.getAnonymousLogger().log(info_logging_level,"Sleep was interrupted, will immediately try to connect to the broker"); } - if (targeted_prediction_time==null){ - continue; - } - Logger.getAnonymousLogger().log(info_logging_level, "Targeted_prediction_time " + targeted_prediction_time); - Thread internal_severity_calculation_thread = new Thread(() -> { - try { - /* - synchronized (ADAPTATION_TIMES_MODIFY) { - while (!ADAPTATION_TIMES_MODIFY.getValue()) { - ADAPTATION_TIMES_MODIFY.wait(); - } - ADAPTATION_TIMES_MODIFY.setValue(false); - adaptation_times.remove(targeted_prediction_time);//remove from the list of timepoints which should be processed. Later this timepoint will be added to the adaptation_times_to_remove HashSet to remove any data associated with it - ADAPTATION_TIMES_MODIFY.setValue(true); - ADAPTATION_TIMES_MODIFY.notifyAll(); - } - //adaptation_times_pending_processing.add(targeted_prediction_time); - - */ - synchronized (PREDICTION_EXISTS) { - PREDICTION_EXISTS.setValue(adaptation_times.size() > 0); - } - - Long sleep_time = targeted_prediction_time * 1000 - time_horizon_seconds * 1000L - current_time; - if (sleep_time <= 0) { - Logger.getAnonymousLogger().log(info_logging_level, "Prediction cancelled as targeted prediction time was " + targeted_prediction_time * 1000 + " current time is " + current_time + " and the time_horizon is " + time_horizon_seconds * 1000); - return; //The predictions are too near to the targeted reconfiguration time (or are even obsolete) - } else if (sleep_time > current_time + maximum_acceptable_forward_predictions * time_horizon_seconds * 1000L) { - Logger.getAnonymousLogger().log(info_logging_level, "Prediction cancelled as targeted prediction time was " + targeted_prediction_time * 1000 + " and the current time is " + current_time + ". The prediction is more than " + maximum_acceptable_forward_predictions + " time_horizon intervals into the future (the time_horizon is " + time_horizon_seconds * 1000 + " milliseconds)"); - return; //The predictions are too near to the targeted reconfiguration tim - } - Logger.getAnonymousLogger().log(info_logging_level, "Sleeping for " + sleep_time + " milliseconds"); - sleep(sleep_time); - double rule_severity = process_rule_value(rule.getRule_representation(), targeted_prediction_time, rule.getRule_format()); - double slo_violation_probability = determine_slo_violation_probability(rule_severity); - Logger.getAnonymousLogger().log(info_logging_level, "The overall " + slo_violation_determination_method + " severity - calculated from real data - for adaptation time " + targeted_prediction_time + " ( " + (new Date((new Timestamp(targeted_prediction_time * 1000)).getTime())) + " ) is " + rule_severity + " and is calculated " + time_horizon_seconds + " seconds beforehand"); - Logger.getAnonymousLogger().log(info_logging_level, "The probability of an SLO violation is " + ((int) (slo_violation_probability * 100)) + "%" + (slo_violation_probability< slo_violation_probability_threshold ?" so it will not be published":" and it will be published")); - - if (slo_violation_probability>= slo_violation_probability_threshold) { - JSONObject severity_json = new JSONObject(); - severity_json.put("severity", rule_severity); - severity_json.put("probability", slo_violation_probability); - severity_json.put("predictionTime", targeted_prediction_time); - persistent_publisher.publish(severity_json.toJSONString()); - } - - slo_violation_event_recording_queue.add(System.currentTimeMillis()); - - //Probably not necessary to synchronize the line below as each removal will happen only once in a reconfiguration interval, and reconfiguration intervals are assumed to have a duration of minutes. - //Necessary to synchronize because another severity calculation thread might invoke clean_data above, and then a concurrent modification exception may arise - synchronized (ADAPTATION_TIMES_MODIFY){ - while (!ADAPTATION_TIMES_MODIFY.getValue()){ - ADAPTATION_TIMES_MODIFY.wait(); - } - ADAPTATION_TIMES_MODIFY.setValue(false); - adaptation_times_to_remove.add(targeted_prediction_time); //This line serves a different purpose from the adaptation_times.remove(...) directive above, as the adaptation_times_to_remove HashSet contains timepoints which should be processed to delete their data. - adaptation_times_pending_processing.remove(targeted_prediction_time); - ADAPTATION_TIMES_MODIFY.setValue(true); - ADAPTATION_TIMES_MODIFY.notifyAll(); - } - } catch (InterruptedException i) { - Logger.getAnonymousLogger().log(severe_logging_level, "Severity calculation thread for epoch time " + targeted_prediction_time + " interrupted, stopping..."); - return; - } - }); - internal_severity_calculation_thread.setName("internal_severity_calculation_thread_" + targeted_prediction_time); - internal_severity_calculation_thread.start(); - } catch (NoSuchElementException n) { - Logger.getAnonymousLogger().log(warning_logging_level, "Could not calculate severity as a value was missing..."); - continue; } + }; + CharacterizedThread.create_new_thread(device_lost_topic_subscriber_runnable,"device_lost_topic_subscriber_thread",persistent_running_thread,true); + + + if (self_publish_rule_file) { + String json_file_name = prop.getProperty("input_file"); + String rules_json_string = String.join(EMPTY, Files.readAllLines(Paths.get(new File(json_file_name).getAbsolutePath()))); + BrokerPublisher publisher = new BrokerPublisher(slo_rules_topic, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password"), amq_library_configuration_location); + publisher.publish(rules_json_string); + Logger.getAnonymousLogger().log(info_logging_level, "Sent message\n" + rules_json_string); } - running_threads.remove("severity_calculation_thread_"+rule.toString()); - }); - String severity_calculation_thread_name = "severity_calculation_thread_"+rule.toString(); - severity_calculation_thread.setName(severity_calculation_thread_name); - severity_calculation_thread.start(); - running_threads.put(severity_calculation_thread_name,severity_calculation_thread); - }/* - while (true){ - - }*/ - - //return slo_processors; - } - - private static Long get_next_targeted_prediction_time() { - List possible_targeted_prediction_times = adaptation_times.stream().sorted().limit(maximum_acceptable_forward_predictions).collect(Collectors.toList()); - for (int i=0; i adaptation_times_to_remove) { - for (Long processed_adaptation_time:adaptation_times_to_remove){ - if (processed_adaptation_time>last_processed_adaptation_time){ - last_processed_adaptation_time = processed_adaptation_time; - } - synchronized (Main.can_modify_slo_rules) { - while (!Main.can_modify_slo_rules.getValue()) { + first_run = false; + synchronized (can_modify_slo_rules) { + do { try { can_modify_slo_rules.wait(); - } catch (InterruptedException e) { - e.printStackTrace(); + }catch (InterruptedException i){ + i.printStackTrace(); } - } + }while((!can_modify_slo_rules.getValue()) || (!slo_rule_arrived.get())); can_modify_slo_rules.setValue(false); - for (SLORule slo_rule : slo_rules) { - for (SLOSubRule subrule : slo_rule.getSlo_subrules()) { - if (getPredicted_monitoring_attributes().containsKey(subrule.getId())) { - getPredicted_monitoring_attributes().get(subrule.getId()).remove(processed_adaptation_time); - } + slo_rule_arrived.set(false); + String rule_representation = MESSAGE_CONTENTS.get_synchronized_contents(slo_rules_topic); + if (slo_rule_arrived_has_updated_version(rule_representation)) { + if (single_slo_rule_active) { + slo_rules.clear(); } + + ArrayList additional_metrics_from_new_slo = get_metric_list_from_JSON_slo(rule_representation); + + if (additional_metrics_from_new_slo.size() > 0) { + slo_rules.add(new SLORule(rule_representation, additional_metrics_from_new_slo)); + } + can_modify_slo_rules.setValue(true); + can_modify_slo_rules.notifyAll(); + }else{ + can_modify_slo_rules.setValue(true); + can_modify_slo_rules.notifyAll(); + continue; } - can_modify_slo_rules.setValue(true); - can_modify_slo_rules.notifyAll(); } + + stop_all_running_threads(); + DebugDataSubscription.initiate(prop.getProperty("broker_ip_url"),prop.getProperty("broker_username"), prop.getProperty("broker_password")); + initialize_monitoring_datastructures_with_empty_data(slo_rules); + // + initialize_subrule_and_attribute_associations(slo_rules); + initialize_attribute_subscribers(slo_rules, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password")); + initialize_slo_processing(slo_rules); + } } - /** - * This function determines the probability of an SLO violation - * @param rule_severity The severity of the rule which has been determined - * @return The probability of the rule being violated. The minimum value of this probability is 0, and increases as the severity increases - */ - public static double determine_slo_violation_probability(double rule_severity) { - if (slo_violation_determination_method.equals("all-metrics")) { - //39.64 is the mean severity value when examining all integer severity values for roc x probability x confidence_interval x delta_value in (-100,100)x(0,100)x(0,100)x(-100,100) - /* - if (rule_severity >= 40) { - return Math.min((50 + 50*(rule_severity - 40) / 60)/100,1); // in case we desire the probability to start from 50% - // return Math.min((100*(rule_severity - 40) / 60)/100,1); // in case we desire the probability to start from 0% - } else { - return 0; - } - - */ - return Math.min(rule_severity/100,100); - }else if (slo_violation_determination_method.equals("prconf-delta")){ - //Logger.getAnonymousLogger().log(warning_logging_level,"The calculation of probability for the prconf-delta method needs to be implemented"); - //return 0; - if (rule_severity >= 6.52){ - return Math.min((50+50*(rule_severity-6.52)/93.48)/100,1); - }else{ - return 0; - } - - }else{ - Logger.getAnonymousLogger().log(warning_logging_level,"Unknown severity calculation method"); - return 0; - } - } - - public static AtomicBoolean getStop_signal() { - return stop_signal; - } - } diff --git a/slo-violation-detector/src/main/java/slo_processing/SLORule.java b/slo-violation-detector/src/main/java/slo_processing/SLORule.java index ea02f57..a163683 100644 --- a/slo-violation-detector/src/main/java/slo_processing/SLORule.java +++ b/slo-violation-detector/src/main/java/slo_processing/SLORule.java @@ -27,6 +27,7 @@ import java.util.stream.Collectors; import static configuration.Constants.*; import static slo_processing.SLOSubRule.find_rule_type; +import static utilities.SLOViolationDetectorStateUtils.severity_calculation_event_recording_queue; import static utility_beans.PredictedMonitoringAttribute.getPredicted_monitoring_attributes; public class SLORule { @@ -225,7 +226,7 @@ public class SLORule { calculation_logging_string.append("\nDue to the severity value being over 10000, it is replaced by 10000"); rule_result_value = 10000; } - Main.severity_calculation_event_recording_queue.add(calculation_logging_string.toString()); + severity_calculation_event_recording_queue.add(calculation_logging_string.toString()); return rule_result_value; } diff --git a/slo-violation-detector/src/main/java/utilities/DebugDataSubscription.java b/slo-violation-detector/src/main/java/utilities/DebugDataSubscription.java index 5352900..44002f1 100644 --- a/slo-violation-detector/src/main/java/utilities/DebugDataSubscription.java +++ b/slo-violation-detector/src/main/java/utilities/DebugDataSubscription.java @@ -5,18 +5,19 @@ import eu.melodic.event.brokerclient.BrokerSubscriber; import org.apache.commons.collections4.queue.CircularFifoQueue; import runtime.Main; import slo_processing.SLOSubRule; +import utility_beans.CharacterizedThread; import utility_beans.RealtimeMonitoringAttribute; import java.util.ArrayList; -import java.util.HashMap; import java.util.Map; import java.util.function.BiFunction; import java.util.logging.Logger; import static configuration.Constants.amq_library_configuration_location; import static configuration.Constants.info_logging_level; -import static runtime.Main.running_threads; -import static runtime.Main.slo_violation_event_recording_queue; +import static runtime.Main.*; +import static utilities.SLOViolationDetectorStateUtils.*; +import static utility_beans.CharacterizedThread.CharacterizedThreadType.slo_bound_running_thread; /** * The objective of this class is to allow a structured synopsis of the current state of the SLO Violation Detector to be created, as a response to a request sent to it through an appropriate topic. @@ -33,7 +34,18 @@ public class DebugDataSubscription { intermediate_debug_string = new StringBuilder(intermediate_debug_string + "The following threads are currently running" + "\n"); boolean flag_first_element_iterated = true; - for (String s : running_threads.keySet()){ + intermediate_debug_string.append("Persistent running threads:\n"); + for (String s : persistent_running_threads.keySet()){ + if (flag_first_element_iterated) { + intermediate_debug_string.append(s); + flag_first_element_iterated = false; + }else{ + intermediate_debug_string.append(",\n").append(s); + } + } + flag_first_element_iterated = true; + intermediate_debug_string.append("SLO-bound running threads:\n"); + for (String s : slo_bound_running_threads.keySet()){ if (flag_first_element_iterated) { intermediate_debug_string.append(s); flag_first_element_iterated = false; @@ -81,7 +93,7 @@ public class DebugDataSubscription { output_debug_data = output_debug_data+intermediate_debug_string; intermediate_debug_string = new StringBuilder(); - output_debug_data = output_debug_data+"\nShowing the adaptation times that pend processing:\n"+ Main.adaptation_times_pending_processing; + output_debug_data = output_debug_data+"\nShowing the adaptation times that pend processing:\n"+ adaptation_times_pending_processing; intermediate_debug_string.append("\nThese are the timestamps of the latest adaptation events\n").append(slo_violation_event_recording_queue); Logger.getGlobal().log(info_logging_level,"Debug data generated:\n"+output_debug_data); @@ -91,11 +103,11 @@ public class DebugDataSubscription { }; public static void initiate(String broker_ip_address, String broker_username, String broker_password) { BrokerSubscriber debug_data_subscriber = new BrokerSubscriber(debug_data_trigger_topic_name, broker_ip_address, broker_username, broker_password, amq_library_configuration_location); - Thread debug_data_subscription_thread = new Thread(() -> { + Runnable debug_data_subscription_runnable = () -> { try { - synchronized (Main.HAS_MESSAGE_ARRIVED.get_synchronized_boolean(debug_data_trigger_topic_name)) { + synchronized (HAS_MESSAGE_ARRIVED.get_synchronized_boolean(debug_data_trigger_topic_name)) { //if (Main.HAS_MESSAGE_ARRIVED.get_synchronized_boolean(debug_data_topic_name).getValue()) - debug_data_subscriber.subscribe(debug_data_generation, Main.stop_signal); + debug_data_subscriber.subscribe(debug_data_generation, stop_signal); } if (Thread.interrupted()) { throw new InterruptedException(); @@ -107,10 +119,9 @@ public class DebugDataSubscription { } } finally { Logger.getAnonymousLogger().log(info_logging_level, "Removing debug data subscriber thread for " + debug_data_trigger_topic_name); - running_threads.remove("debug_data_subscription_thread_" + debug_data_trigger_topic_name); + slo_bound_running_threads.remove("debug_data_subscription_thread_" + debug_data_trigger_topic_name); } - }); - running_threads.put("debug_data_subscription_thread_" + debug_data_trigger_topic_name, debug_data_subscription_thread); - debug_data_subscription_thread.start(); + }; + CharacterizedThread.create_new_thread(debug_data_subscription_runnable,"debug_data_subscription_thread_" + debug_data_trigger_topic_name,slo_bound_running_thread,true); } } diff --git a/slo-violation-detector/src/main/java/utility_beans/MonitoringAttributeUtilities.java b/slo-violation-detector/src/main/java/utilities/MonitoringAttributeUtilities.java similarity index 96% rename from slo-violation-detector/src/main/java/utility_beans/MonitoringAttributeUtilities.java rename to slo-violation-detector/src/main/java/utilities/MonitoringAttributeUtilities.java index 023976b..7eccf95 100644 --- a/slo-violation-detector/src/main/java/utility_beans/MonitoringAttributeUtilities.java +++ b/slo-violation-detector/src/main/java/utilities/MonitoringAttributeUtilities.java @@ -6,9 +6,11 @@ * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ -package utility_beans; +package utilities; import slo_processing.SLOSubRule; +import utility_beans.MonitoringAttributeStatistics; +import utility_beans.RealtimeMonitoringAttribute; import java.util.ArrayList; import java.util.HashMap; diff --git a/slo-violation-detector/src/main/java/utilities/OperationalModeUtils.java b/slo-violation-detector/src/main/java/utilities/OperationalModeUtils.java new file mode 100644 index 0000000..11ebfc2 --- /dev/null +++ b/slo-violation-detector/src/main/java/utilities/OperationalModeUtils.java @@ -0,0 +1,20 @@ +package utilities; + +import utility_beans.OperationalMode; + +import java.util.logging.Level; +import java.util.logging.Logger; + +public class OperationalModeUtils{ + public static OperationalMode getSLOViolationDetectionOperationalMode(String operational_mode) { + if (operational_mode.equalsIgnoreCase("DIRECTOR")){ + return OperationalMode.DIRECTOR; + }else if (operational_mode.equalsIgnoreCase("DETECTOR")){ + return OperationalMode.DETECTOR; + } + else{ + Logger.getAnonymousLogger().log(Level.SEVERE,"Creating new SLO Violation Detection instance as a DETECTOR node, however the specification of the type of node whould be DIRECTOR or DETECTOR, not "+operational_mode); + return OperationalMode.DIRECTOR; + } + } +} \ No newline at end of file diff --git a/slo-violation-detector/src/main/java/utilities/SLOViolationDetectorStateUtils.java b/slo-violation-detector/src/main/java/utilities/SLOViolationDetectorStateUtils.java new file mode 100644 index 0000000..c92351a --- /dev/null +++ b/slo-violation-detector/src/main/java/utilities/SLOViolationDetectorStateUtils.java @@ -0,0 +1,61 @@ +package utilities; + +import org.apache.commons.collections4.queue.CircularFifoQueue; +import slo_processing.SLORule; +import utility_beans.*; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Logger; + +import static configuration.Constants.*; + +public class SLOViolationDetectorStateUtils { + public static ArrayList slo_rules = new ArrayList<>(); + public static HashMap slo_bound_running_threads = new HashMap<>(); + public static HashMap persistent_running_threads = new HashMap<>(); + public static HashSet adaptation_times = new HashSet<>(); + public static HashSet adaptation_times_pending_processing = new HashSet<>(); + public static Long last_processed_adaptation_time = -1L;//initialization + public static int slo_violation_detection_component_instance_identifier; + private static String self_starting_command_string = "java -jar SLOSeverityCalculator-4.0-SNAPSHOT.jar > $LOG_FILE 2>&1"; + public static OperationalMode operational_mode; + public static final AtomicBoolean stop_signal = new AtomicBoolean(false); + public static final SynchronizedInteger create_new_slo_detector = new SynchronizedInteger(0); + public static final SynchronizedBoolean PREDICTION_EXISTS = new SynchronizedBoolean(false); + public static final SynchronizedBoolean ADAPTATION_TIMES_MODIFY = new SynchronizedBoolean(true); + public static SynchronizedBooleanMap HAS_MESSAGE_ARRIVED = new SynchronizedBooleanMap(); + public static SynchronizedStringMap MESSAGE_CONTENTS = new SynchronizedStringMap(); + + public static final AtomicBoolean slo_rule_arrived = new AtomicBoolean(false); + public static final SynchronizedBoolean can_modify_slo_rules = new SynchronizedBoolean(false); + + //Debugging variables + public static CircularFifoQueue slo_violation_event_recording_queue = new CircularFifoQueue<>(50); + public static CircularFifoQueue severity_calculation_event_recording_queue = new CircularFifoQueue<>(50); + public static Properties prop = new Properties(); + + + public static InputStream getPreferencesFileInputStream(String custom_properties_file_path) throws IOException { + if (custom_properties_file_path==null || custom_properties_file_path.equals(EMPTY)) { + base_project_path = new File(EMPTY).toURI(); + URI absolute_configuration_file_path = new File(configuration_file_location).toURI(); + URI relative_configuration_file_path = base_project_path.relativize(absolute_configuration_file_path); + Logger.getAnonymousLogger().log(info_logging_level, "This is the base project path:" + base_project_path); + return new FileInputStream(base_project_path.getPath() + relative_configuration_file_path); + }else{ + if (base_project_path == null || base_project_path.getPath().equals(EMPTY)) { + base_project_path = new File(custom_properties_file_path).toURI(); + } + return new FileInputStream(base_project_path.getPath()); + } + } +} diff --git a/slo-violation-detector/src/main/java/utilities/SLOViolationDetectorUtils.java b/slo-violation-detector/src/main/java/utilities/SLOViolationDetectorUtils.java new file mode 100644 index 0000000..ba80483 --- /dev/null +++ b/slo-violation-detector/src/main/java/utilities/SLOViolationDetectorUtils.java @@ -0,0 +1,252 @@ +package utilities; + +import metric_retrieval.AttributeSubscription; +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; +import slo_processing.SLORule; +import slo_processing.SLOSubRule; +import utility_beans.CharacterizedThread; + +import java.util.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.stream.Collectors; + +import static configuration.Constants.*; +import static processing_logic.Runnables.get_severity_calculation_runnable; +import static runtime.Main.*; +import static utility_beans.CharacterizedThread.CharacterizedThreadType.slo_bound_running_thread; +import static utilities.SLOViolationDetectorStateUtils.*; +import static utility_beans.PredictedMonitoringAttribute.getPredicted_monitoring_attributes; + +public class SLOViolationDetectorUtils { + + public static HashSet adaptation_times_to_remove = new HashSet<>(); + + + public static void initialize_subrule_and_attribute_associations(ArrayList slo_rules) { + synchronized (can_modify_slo_rules) { + while (!can_modify_slo_rules.getValue()){ + try { + can_modify_slo_rules.wait(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + can_modify_slo_rules.setValue(false); + for (SLORule slo_rule : slo_rules) { + for (SLOSubRule subrule : SLORule.parse_subrules(slo_rule.getRule_representation(),slo_rule.getRule_format())) { + SLOSubRule.getSlo_subrules_per_monitoring_attribute().computeIfAbsent(subrule.getMetric(), k -> new ArrayList<>()); + SLOSubRule.getSlo_subrules_per_monitoring_attribute().get(subrule.getMetric()).add(subrule); + } + } + can_modify_slo_rules.setValue(true); + can_modify_slo_rules.notifyAll(); + } + } + + + public static void initialize_monitoring_datastructures_with_empty_data(ArrayList slo_rules){ + for(SLORule slo_rule: slo_rules){ + for (String metric_name : slo_rule.get_monitoring_attributes()) { + MonitoringAttributeUtilities.initialize_values(metric_name); + } + } + } + + public static void initialize_slo_processing(ArrayList rules_list){ + + for (SLORule rule:rules_list) { + + String severity_calculation_thread_name = "severity_calculation_thread_"+rule.toString(); + CharacterizedThread.create_new_thread(get_severity_calculation_runnable(rule),severity_calculation_thread_name, slo_bound_running_thread,true); + + } + } + + + public static void clean_data(HashSet adaptation_times_to_remove) { + for (Long processed_adaptation_time:adaptation_times_to_remove){ + if (processed_adaptation_time>last_processed_adaptation_time){ + last_processed_adaptation_time = processed_adaptation_time; + } + synchronized (can_modify_slo_rules) { + while (!can_modify_slo_rules.getValue()) { + try { + can_modify_slo_rules.wait(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + can_modify_slo_rules.setValue(false); + for (SLORule slo_rule : slo_rules) { + for (SLOSubRule subrule : slo_rule.getSlo_subrules()) { + if (getPredicted_monitoring_attributes().containsKey(subrule.getId())) { + getPredicted_monitoring_attributes().get(subrule.getId()).remove(processed_adaptation_time); + } + } + } + can_modify_slo_rules.setValue(true); + can_modify_slo_rules.notifyAll(); + } + } + } + + public static Long get_next_targeted_prediction_time() { + List possible_targeted_prediction_times = adaptation_times.stream().sorted().limit(maximum_acceptable_forward_predictions).collect(Collectors.toList()); + for (int i=0; i initialize_attribute_subscribers(ArrayList rules_list, String broker_ip_address, String broker_username, String broker_password){ + ArrayList attribute_subscribers = new ArrayList<>(); + for (SLORule rule:rules_list){ + attribute_subscribers.add(new AttributeSubscription(rule,broker_ip_address,broker_username,broker_password)); + } + return attribute_subscribers; + } + + public static AtomicBoolean getStop_signal() { + return stop_signal; + } + + public static boolean slo_rule_arrived_has_updated_version(String rule_representation) { + JSONObject json_object = null; + long json_object_version = Integer.MAX_VALUE; + try { + json_object = (JSONObject) new JSONParser().parse(rule_representation); + json_object_version = (Long) json_object.get("version"); + } catch (NullPointerException n){ + n.printStackTrace(); + Logger.getAnonymousLogger().log(info_logging_level,"Unfortunately a null message was sent to the SLO Violation Detector, which is being ignored"); + return false; + } catch (Exception e){ + e.printStackTrace(); + Logger.getAnonymousLogger().log(info_logging_level,"Could not parse the JSON of the new SLO, assuming it is not an updated rule..."); + return false; + } + if (json_object_version > current_slo_rules_version){ + Logger.getAnonymousLogger().log(info_logging_level,"An SLO with updated version ("+json_object_version+" vs older "+current_slo_rules_version+") has arrived"); + current_slo_rules_version=json_object_version; + return true; + }else { + Logger.getAnonymousLogger().log(info_logging_level,"Taking no action for the received SLO message as the version number is not updated"); + return false; + } + } + + public static void stop_all_running_threads() { + Logger.getAnonymousLogger().log(info_logging_level,"Asking previously existing threads to terminate"); + int initial_number_of_running_threads = slo_bound_running_threads.size(); + while (slo_bound_running_threads.size()>0) { + synchronized (stop_signal) { + stop_signal.set(true); + stop_signal.notifyAll(); + } + try { + Thread.sleep(3000); + slo_bound_running_threads.values().forEach(Thread::interrupt); + }catch (Exception e){ + } + Logger.getAnonymousLogger().log(info_logging_level,"Stopped "+(initial_number_of_running_threads- slo_bound_running_threads.size())+"/"+initial_number_of_running_threads+" already running threads"); + if (slo_bound_running_threads.size()>1){ + Logger.getAnonymousLogger().log(info_logging_level,"The threads which are still running are the following: "+ slo_bound_running_threads); + }else if (slo_bound_running_threads.size()>0){ + Logger.getAnonymousLogger().log(info_logging_level,"The thread which is still running is the following: "+ slo_bound_running_threads); + } + } + Logger.getAnonymousLogger().log(info_logging_level,"All threads have terminated"); + synchronized (stop_signal) { + stop_signal.set(false); + } + synchronized (PREDICTION_EXISTS){ + PREDICTION_EXISTS.setValue(false); + } + adaptation_times.clear(); + } + + public static ArrayList get_metric_list_from_JSON_slo(String json_object_string) { + HashSet metric_list = new HashSet<>(); + try { + JSONObject json_object = (JSONObject) new JSONParser().parse(json_object_string); + String json_object_id = (String) json_object.get("id"); + String json_object_name = (String) json_object.get("name"); + //Older format uses id-based fields, newer format uses a non-variable structure + //We first check if an event using the older format is sent, and then check if the event is sent using the newer format + if (json_object_id!=null) { + if (json_object_id.split("-").length > 1) { + //String composite_rule_type = json_object_id.split("-")[0]; + JSONArray internal_json_slos = (JSONArray) json_object.get(json_object_id); + for (Object o : internal_json_slos) { + JSONObject internal_json_slo = (JSONObject) o; + metric_list.addAll(get_metric_list_from_JSON_slo(internal_json_slo.toJSONString())); + } + } else { + metric_list.add((String) json_object.get("attribute")); + } + } + //If newer format is used + else if (json_object_name!=null){ + JSONArray internal_json_slos = (JSONArray) json_object.get("constraints"); + if ((internal_json_slos!=null) && (internal_json_slos.size()>0)){ + for (Object o : internal_json_slos) { + JSONObject internal_json_slo = (JSONObject) o; + metric_list.addAll(get_metric_list_from_JSON_slo(internal_json_slo.toJSONString())); + } + }else{ + metric_list.add((String) json_object.get("metric")); + } + }else{ + Logger.getAnonymousLogger().log(Level.INFO,"An SLO rule was sent in a format which could not be fully parsed, therefore ignoring this rule. The non-understandable part of the SLO rule is printed below"+"\n"+json_object_string); + } + }catch (Exception p){ + p.printStackTrace(); + return new ArrayList(); + } + return new ArrayList(metric_list); + } + + /** + * This function determines the probability of an SLO violation + * @param rule_severity The severity of the rule which has been determined + * @return The probability of the rule being violated. The minimum value of this probability is 0, and increases as the severity increases + */ + public static double determine_slo_violation_probability(double rule_severity) { + if (slo_violation_determination_method.equals("all-metrics")) { + //39.64 is the mean severity value when examining all integer severity values for roc x probability x confidence_interval x delta_value in (-100,100)x(0,100)x(0,100)x(-100,100) + /* + if (rule_severity >= 40) { + return Math.min((50 + 50*(rule_severity - 40) / 60)/100,1); // in case we desire the probability to start from 50% + // return Math.min((100*(rule_severity - 40) / 60)/100,1); // in case we desire the probability to start from 0% + } else { + return 0; + } + + */ + return Math.min(rule_severity/100,100); + }else if (slo_violation_determination_method.equals("prconf-delta")){ + //Logger.getAnonymousLogger().log(warning_logging_level,"The calculation of probability for the prconf-delta method needs to be implemented"); + //return 0; + if (rule_severity >= 6.52){ + return Math.min((50+50*(rule_severity-6.52)/93.48)/100,1); + }else{ + return 0; + } + + }else{ + Logger.getAnonymousLogger().log(warning_logging_level,"Unknown severity calculation method"); + return 0; + } + } + +} diff --git a/slo-violation-detector/src/main/java/utility_beans/CharacterizedThread.java b/slo-violation-detector/src/main/java/utility_beans/CharacterizedThread.java new file mode 100644 index 0000000..e197f50 --- /dev/null +++ b/slo-violation-detector/src/main/java/utility_beans/CharacterizedThread.java @@ -0,0 +1,34 @@ +package utility_beans; + +import java.util.logging.Level; +import java.util.logging.Logger; + +import static utilities.SLOViolationDetectorStateUtils.persistent_running_threads; +import static utilities.SLOViolationDetectorStateUtils.slo_bound_running_threads; + +public class CharacterizedThread{ + public enum CharacterizedThreadType{ + slo_bound_running_thread,persistent_running_thread,undefined + } + + public enum CharacterizedThreadRunMode{ + attached,detached + } + + public static Thread create_new_thread(Runnable runnable, String thread_name, CharacterizedThreadType thread_type, boolean start_thread_now){ + Thread thread = new Thread(runnable); + thread.setName(thread_name); + if (thread_type.equals(CharacterizedThreadType.slo_bound_running_thread)){ + slo_bound_running_threads.put(thread_name,thread); + }else if (thread_type.equals(CharacterizedThreadType.persistent_running_thread)){ + persistent_running_threads.put(thread_name,thread); + }else{ + Logger.getAnonymousLogger().log(Level.WARNING,"Undefined type of thread for thread with name: "+thread_name); + } + if (start_thread_now) { + thread.start(); + } + return thread; + } + +} diff --git a/slo-violation-detector/src/main/java/utility_beans/OperationalMode.java b/slo-violation-detector/src/main/java/utility_beans/OperationalMode.java new file mode 100644 index 0000000..36fd1dc --- /dev/null +++ b/slo-violation-detector/src/main/java/utility_beans/OperationalMode.java @@ -0,0 +1,5 @@ +package utility_beans; + +public enum OperationalMode { + DIRECTOR,DETECTOR +} \ No newline at end of file diff --git a/slo-violation-detector/src/main/java/utility_beans/PredictedMonitoringAttribute.java b/slo-violation-detector/src/main/java/utility_beans/PredictedMonitoringAttribute.java index afdeaa0..69a0979 100644 --- a/slo-violation-detector/src/main/java/utility_beans/PredictedMonitoringAttribute.java +++ b/slo-violation-detector/src/main/java/utility_beans/PredictedMonitoringAttribute.java @@ -13,7 +13,7 @@ import java.util.logging.Level; import java.util.logging.Logger; import static configuration.Constants.*; -import static utility_beans.MonitoringAttributeUtilities.isZero; +import static utilities.MonitoringAttributeUtilities.isZero; import static utility_beans.RealtimeMonitoringAttribute.*; public class PredictedMonitoringAttribute { diff --git a/slo-violation-detector/src/main/java/utility_beans/SynchronizedInteger.java b/slo-violation-detector/src/main/java/utility_beans/SynchronizedInteger.java new file mode 100644 index 0000000..36db47e --- /dev/null +++ b/slo-violation-detector/src/main/java/utility_beans/SynchronizedInteger.java @@ -0,0 +1,19 @@ +package utility_beans; + +public class SynchronizedInteger { + private Integer value; + public SynchronizedInteger(Integer value){ + this.value = value; + } + public SynchronizedInteger(){ + this(0); + } + + public Integer getValue() { + return value; + } + + public void setValue(Integer value) { + this.value = value; + } +} diff --git a/slo-violation-detector/src/main/resources/META-INF/MANIFEST.MF b/slo-violation-detector/src/main/resources/META-INF/MANIFEST.MF new file mode 100644 index 0000000..de57e25 --- /dev/null +++ b/slo-violation-detector/src/main/resources/META-INF/MANIFEST.MF @@ -0,0 +1,3 @@ +Manifest-Version: 1.0 +Main-Class: runtime.Main + diff --git a/slo-violation-detector/src/main/resources/application.properties b/slo-violation-detector/src/main/resources/application.properties new file mode 100644 index 0000000..56eb86c --- /dev/null +++ b/slo-violation-detector/src/main/resources/application.properties @@ -0,0 +1 @@ +spring.profiles.active=production \ No newline at end of file diff --git a/slo-violation-detector/src/test/java/ConnectivityTests.java b/slo-violation-detector/src/test/java/ConnectivityTests.java index d648589..8440918 100644 --- a/slo-violation-detector/src/test/java/ConnectivityTests.java +++ b/slo-violation-detector/src/test/java/ConnectivityTests.java @@ -8,13 +8,11 @@ import eu.melodic.event.brokerclient.BrokerPublisher; import eu.melodic.event.brokerclient.BrokerSubscriber; -import org.apache.commons.lang3.mutable.MutableBoolean; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; import org.json.simple.parser.ParseException; import org.junit.Test; -import javax.jms.JMSException; import java.io.File; import java.io.FileInputStream; import java.io.IOException; @@ -26,7 +24,7 @@ import java.util.function.BiFunction; import java.util.logging.Logger; import static configuration.Constants.*; -import static runtime.Main.running_threads; +import static utilities.SLOViolationDetectorStateUtils.slo_bound_running_threads; public class ConnectivityTests { @@ -74,7 +72,7 @@ public class ConnectivityTests { subscriber.subscribe(slo_function,new AtomicBoolean(false)); //will be a short-lived test, so setting stop signal to false }); subscription_thread.start(); - running_threads.put("Test topic subscription thread",subscription_thread); + slo_bound_running_threads.put("Test topic subscription thread",subscription_thread); publisher.publish(object_to_publish.toJSONString()); try { diff --git a/slo-violation-detector/src/test/java/DerivedMonitoringAttributeTests.java b/slo-violation-detector/src/test/java/DerivedMonitoringAttributeTests.java index 0d27d77..4e7b4c9 100644 --- a/slo-violation-detector/src/test/java/DerivedMonitoringAttributeTests.java +++ b/slo-violation-detector/src/test/java/DerivedMonitoringAttributeTests.java @@ -8,7 +8,6 @@ import org.junit.Test; import utility_beans.MonitoringAttributeStatistics; -import utility_beans.MonitoringAttributeUtilities; import utility_beans.RealtimeMonitoringAttribute; import utility_beans.PredictedMonitoringAttribute; diff --git a/slo-violation-detector/src/test/java/UnboundedMonitoringAttributeTests.java b/slo-violation-detector/src/test/java/UnboundedMonitoringAttributeTests.java index ca588d8..12da059 100644 --- a/slo-violation-detector/src/test/java/UnboundedMonitoringAttributeTests.java +++ b/slo-violation-detector/src/test/java/UnboundedMonitoringAttributeTests.java @@ -19,7 +19,7 @@ import org.junit.Test; import runtime.Main; import slo_processing.SLORule; import slo_processing.SLOSubRule; -import utility_beans.MonitoringAttributeUtilities; +import utilities.MonitoringAttributeUtilities; import utility_beans.PredictedMonitoringAttribute; import utility_beans.RealtimeMonitoringAttribute; @@ -39,9 +39,9 @@ import java.util.logging.Level; import java.util.logging.Logger; import static configuration.Constants.*; -import static runtime.Main.PREDICTION_EXISTS; -import static runtime.Main.initialize_subrule_and_attribute_associations; import static slo_processing.SLORule.process_rule_value; +import static utilities.SLOViolationDetectorUtils.initialize_subrule_and_attribute_associations; +import static utilities.SLOViolationDetectorStateUtils.*; import static utility_beans.PredictedMonitoringAttribute.getPredicted_monitoring_attributes; import static utility_beans.RealtimeMonitoringAttribute.*; @@ -81,7 +81,7 @@ public class UnboundedMonitoringAttributeTests { public void unbounded_monitoring_attribute_test_core(String json_file_name, String metric_1_name, Double[] metric_lower_bound_range, Double[] metric_upper_bound_range, double severity_lower_bound, double base_metric_value, double metric_max_value, double forecasted_metric_value,double generated_data_confidence_interval, double probability) throws IOException, ParseException { - Main.can_modify_slo_rules.setValue(true); + can_modify_slo_rules.setValue(true); Properties prop = new Properties(); URI absolute_configuration_file_path = new File(configuration_file_location).toURI(); @@ -153,15 +153,15 @@ public class UnboundedMonitoringAttributeTests { long targeted_prediction_time = ((Number)((JSONObject)new JSONParser().parse(message)).get(EventFields.PredictionMetricEventFields.prediction_time)).longValue(); Logger.getAnonymousLogger().log(info_logging_level,"RECEIVED message with predicted value for "+predicted_attribute_name+" equal to "+ forecasted_value); - synchronized (Main.can_modify_slo_rules) { - if(!Main.can_modify_slo_rules.getValue()) { - Main.can_modify_slo_rules.wait(); + synchronized (can_modify_slo_rules) { + if(!can_modify_slo_rules.getValue()) { + can_modify_slo_rules.wait(); } - Main.can_modify_slo_rules.setValue(false); + can_modify_slo_rules.setValue(false); - if( Main.adaptation_times.size()==0 || (!Main.adaptation_times.contains(targeted_prediction_time)) && targeted_prediction_time>Main.adaptation_times.stream().min(Long::compare).get()){ + if( adaptation_times.size()==0 || (!adaptation_times.contains(targeted_prediction_time)) && targeted_prediction_time>adaptation_times.stream().min(Long::compare).get()){ Logger.getAnonymousLogger().log(info_logging_level,"Adding a new targeted prediction time "+targeted_prediction_time); - Main.adaptation_times.add(targeted_prediction_time); + adaptation_times.add(targeted_prediction_time); synchronized (PREDICTION_EXISTS) { PREDICTION_EXISTS.setValue(true); PREDICTION_EXISTS.notifyAll(); @@ -181,7 +181,7 @@ public class UnboundedMonitoringAttributeTests { getPredicted_monitoring_attributes().get(subrule.getId()).put(targeted_prediction_time, prediction_attribute); } } - Main.can_modify_slo_rules.setValue(true); + can_modify_slo_rules.setValue(true); } //SLOViolationCalculator.get_Severity_all_metrics_method(prediction_attribute) @@ -191,7 +191,7 @@ public class UnboundedMonitoringAttributeTests { return message; }; Thread forecasted_subscription_thread = new Thread(() -> { - synchronized (Main.HAS_MESSAGE_ARRIVED.get_synchronized_boolean(forecasted_metric_topic_name)) { + synchronized (HAS_MESSAGE_ARRIVED.get_synchronized_boolean(forecasted_metric_topic_name)) { //if (Main.HAS_MESSAGE_ARRIVED.get_synchronized_boolean(forecasted_metric_topic_name).getValue()) forecasted_subscriber.subscribe(forecasted_function,new AtomicBoolean(false)); //will be a short-lived test, so setting stop signal to false }