diff --git a/slo-violation-detector/Dockerfile b/slo-violation-detector/Dockerfile index 4344c92..a6bec8a 100644 --- a/slo-violation-detector/Dockerfile +++ b/slo-violation-detector/Dockerfile @@ -14,8 +14,8 @@ RUN mvn -f /home/app/pom.xml -DfailIfNoTests=false -Dtest=!UnboundedMonitoringAt FROM docker.io/library/eclipse-temurin:17 RUN mkdir -p /home/src/main/resources/ -COPY src/main/resources/config/eu.nebulous.slo_violation_detector.properties /home/src/main/resources/config/input_data.properties -COPY src/main/resources/config/eu.melodic.event.brokerclient.properties /home/src/main/resources/config/eu.melodic.event.brokerclient.properties +COPY src/main/resources/config/eu.nebulous.slo_violation_detector.properties /home/slo-violation-detector/src/main/resources/config/eu.nebulous.slo_violation_detector.properties +COPY src/main/resources/config/eu.melodic.event.brokerclient.properties /home/slo-violation-detector/src/main/resources/config/eu.melodic.event.brokerclient.properties COPY --from=build /home/app/target/SLO-Violation-Detector-4.0-SNAPSHOT.jar /home/SLOSeverityCalculator-4.0-SNAPSHOT.jar WORKDIR /home ENV LOG_FILE /home/slo_violation_detector.log diff --git a/slo-violation-detector/src/main/java/configuration/Constants.java b/slo-violation-detector/src/main/java/configuration/Constants.java index 1b31953..9f2dd6f 100644 --- a/slo-violation-detector/src/main/java/configuration/Constants.java +++ b/slo-violation-detector/src/main/java/configuration/Constants.java @@ -27,18 +27,18 @@ public class Constants { public static URI base_project_path; public static String configuration_file_location = "slo-violation-detector/src/main/resources/config/eu.nebulous.slo_violation_detector.properties"; public static String amq_library_configuration_location = "slo-violation-detector/src/main/resources/config/eu.melodic.event.brokerclient.properties"; - public static String topic_for_severity_announcement = "prediction.slo_severity_value"; + public static String topic_for_severity_announcement = "eu.nebulouscloud.monitoring.slo.severity_value"; public static String topic_for_lost_device_announcement = "eu.nebulouscloud.device_lost"; public static String slo_rules_topic = "eu.nebulouscloud.monitoring.slo.new"; public static String metric_list_topic = "eu.nebulouscloud.monitoring.metric_list"; - public static String realtime_metrics_topic = "eu.nebulouscloud.monitoring.realtime."; - public static String final_metric_prediction_topic = "eu.nebulouscloud.monitoring.predicted."; + public static String topic_prefix_realtime_metrics = "eu.nebulouscloud.monitoring.realtime."; + public static String topic_prefix_final_predicted_metrics = "eu.nebulouscloud.monitoring.predicted."; + public static String nebulous_components_application = "nebulous_components_application"; public static double slo_violation_probability_threshold = 0.5; //The threshold over which the probability of a predicted slo violation should be to have a violation detection public static int kept_values_per_metric = 5; //Default to be overriden from the configuration file. This indicates how many metric values are kept to calculate the "previous" metric value during the rate of change calculation public static String roc_calculation_mode = "prototype"; public static boolean self_publish_rule_file = false; //default value to be overriden public static boolean single_slo_rule_active = true; //default value to be overriden - public static boolean first_run = true; public static double roc_limit = 1; public static double epsilon = 0.00000000001; public static Level debug_logging_level = Level.OFF; diff --git a/slo-violation-detector/src/main/java/metric_retrieval/AttributeSubscription.java b/slo-violation-detector/src/main/java/metric_retrieval/AttributeSubscription.java index 114063f..f7e3a8d 100644 --- a/slo-violation-detector/src/main/java/metric_retrieval/AttributeSubscription.java +++ b/slo-violation-detector/src/main/java/metric_retrieval/AttributeSubscription.java @@ -12,7 +12,7 @@ package metric_retrieval; //import eu.melodic.event.brokerclient.templates.EventFields; //import eu.melodic.event.brokerclient.templates.TopicNames; import slo_violation_detector_engine.detector.DetectorSubcomponent; -import utility_beans.BrokerSubscriber; +import utility_beans.*; import utility_beans.BrokerSubscriber.EventFields; import utility_beans.BrokerSubscriber.TopicNames; import org.json.simple.JSONArray; @@ -21,9 +21,6 @@ import org.json.simple.parser.JSONParser; import org.json.simple.parser.ParseException; import slo_rule_modelling.SLORule; import slo_rule_modelling.SLOSubRule; -import utility_beans.CharacterizedThread; -import utility_beans.PredictedMonitoringAttribute; -import utility_beans.RealtimeMonitoringAttribute; import java.time.Clock; import java.util.HashMap; @@ -33,7 +30,6 @@ import java.util.logging.Logger; import static configuration.Constants.*; import static utility_beans.PredictedMonitoringAttribute.getPredicted_monitoring_attributes; -import static utility_beans.RealtimeMonitoringAttribute.update_monitoring_attribute_value; public class AttributeSubscription { SLORule slo_rule; @@ -45,15 +41,13 @@ public class AttributeSubscription { String realtime_metric_topic_name = TopicNames.realtime_metric_values_topic(metric); Logger.getGlobal().log(info_logging_level,"Starting realtime subscription at "+realtime_metric_topic_name); - BrokerSubscriber subscriber = new BrokerSubscriber(realtime_metric_topic_name, broker_ip_address,broker_username,broker_password, amq_library_configuration_location); - BiFunction function = (topic, message) ->{ - RealtimeMonitoringAttribute realtimeMonitoringAttribute = new RealtimeMonitoringAttribute(topic); - String metric_name_from_topic = topic.replace("eu.nebulouscloud.monitoring.realtime.",EMPTY); - synchronized (detector.getSubcomponent_state().getMonitoring_attributes().get(metric_name_from_topic)) { + BrokerSubscriber subscriber = new BrokerSubscriber(realtime_metric_topic_name, broker_ip_address,broker_username,broker_password, amq_library_configuration_location,detector.get_application_name()); + BiFunction function = (broker_details, message) ->{ + synchronized (detector.getSubcomponent_state().getMonitoring_attributes().get(metric)) { try { - update_monitoring_attribute_value(detector,metric_name_from_topic,((Number)((JSONObject)new JSONParser().parse(message)).get("metricValue")).doubleValue()); + detector.update_monitoring_attribute_value(metric,((Number)((JSONObject)new JSONParser().parse(message)).get("metricValue")).doubleValue()); - Logger.getGlobal().log(info_logging_level,"RECEIVED message with value for "+metric_name_from_topic+" equal to "+(((JSONObject)new JSONParser().parse(message)).get("metricValue"))); + Logger.getGlobal().log(info_logging_level,"RECEIVED message with value for "+metric+" equal to "+(((JSONObject)new JSONParser().parse(message)).get("metricValue"))); } catch (ParseException e) { e.printStackTrace(); Logger.getGlobal().log(info_logging_level,"A parsing exception was caught while parsing message: "+message); @@ -86,10 +80,10 @@ public class AttributeSubscription { String forecasted_metric_topic_name = TopicNames.final_metric_predictions_topic(metric); Logger.getGlobal().log(info_logging_level,"Starting forecasted metric subscription at "+forecasted_metric_topic_name); - BrokerSubscriber forecasted_subscriber = new BrokerSubscriber(forecasted_metric_topic_name, broker_ip_address,broker_username,broker_password, amq_library_configuration_location); + BrokerSubscriber forecasted_subscriber = new BrokerSubscriber(forecasted_metric_topic_name, broker_ip_address,broker_username,broker_password, amq_library_configuration_location,detector.get_application_name()); - BiFunction forecasted_function = (topic,message) ->{ - String predicted_attribute_name = topic.replaceFirst("eu\\.nebulouscloud\\.monitoring\\.predicted\\.",EMPTY); + BiFunction forecasted_function = (broker_details,message) ->{ + String predicted_attribute_name = forecasted_metric_topic_name.replaceFirst("eu\\.nebulouscloud\\.monitoring\\.predicted\\.",EMPTY); HashMap> predicted_attributes = getPredicted_monitoring_attributes(); try { JSONObject json_message = (JSONObject)(new JSONParser().parse(message)); @@ -122,7 +116,7 @@ public class AttributeSubscription { } detector.ADAPTATION_TIMES_MODIFY.setValue(false); if (!detector.getSubcomponent_state().adaptation_times.contains(targeted_prediction_time) && (!detector.getSubcomponent_state().adaptation_times_pending_processing.contains(targeted_prediction_time)) && ((targeted_prediction_time * 1000 - time_horizon_seconds * 1000L) > (Clock.systemUTC()).millis())) { - Logger.getGlobal().log(info_logging_level, "Adding a new targeted prediction time " + targeted_prediction_time + " expiring in "+(targeted_prediction_time*1000-System.currentTimeMillis())+" from topic "+topic); + Logger.getGlobal().log(info_logging_level, "Adding a new targeted prediction time " + targeted_prediction_time + " expiring in "+(targeted_prediction_time*1000-System.currentTimeMillis())+" from topic "+forecasted_metric_topic_name); detector.getSubcomponent_state().adaptation_times.add(targeted_prediction_time); synchronized (detector.PREDICTION_EXISTS) { detector.PREDICTION_EXISTS.setValue(true); @@ -130,10 +124,10 @@ public class AttributeSubscription { } }else { if (detector.getSubcomponent_state().adaptation_times.contains(targeted_prediction_time)) { - Logger.getGlobal().log(info_logging_level, "Could not add the new targeted prediction time " + targeted_prediction_time + " from topic " + topic + " as it is already present"); + Logger.getGlobal().log(info_logging_level, "Could not add the new targeted prediction time " + targeted_prediction_time + " from topic " + forecasted_metric_topic_name + " as it is already present"); } else if (!detector.getSubcomponent_state().adaptation_times_pending_processing.contains(targeted_prediction_time)) { if (targeted_prediction_time * 1000 - time_horizon_seconds * 1000L - (Clock.systemUTC()).millis() <= 0) { - Logger.getGlobal().log(info_logging_level, "Could not add the new targeted prediction time " + targeted_prediction_time + " from topic " + topic + " as it would expire in " + (targeted_prediction_time * 1000 - System.currentTimeMillis()) + " milliseconds and the prediction horizon is " + time_horizon_seconds * 1000L + " milliseconds"); + Logger.getGlobal().log(info_logging_level, "Could not add the new targeted prediction time " + targeted_prediction_time + " from topic " + forecasted_metric_topic_name + " as it would expire in " + (targeted_prediction_time * 1000 - System.currentTimeMillis()) + " milliseconds and the prediction horizon is " + time_horizon_seconds * 1000L + " milliseconds"); }else{ Logger.getGlobal().log(info_logging_level,"Adding new prediction time "+targeted_prediction_time+" which expires in " + (targeted_prediction_time * 1000 - System.currentTimeMillis())); detector.getSubcomponent_state().adaptation_times_pending_processing.add(targeted_prediction_time); diff --git a/slo-violation-detector/src/main/java/runtime/DetectorRequestMappings.java b/slo-violation-detector/src/main/java/runtime/DetectorRequestMappings.java index cae9ac8..dd4da29 100644 --- a/slo-violation-detector/src/main/java/runtime/DetectorRequestMappings.java +++ b/slo-violation-detector/src/main/java/runtime/DetectorRequestMappings.java @@ -9,7 +9,7 @@ import static configuration.Constants.default_application_name; import static runtime.Main.detectors; import static slo_violation_detector_engine.detector.DetectorSubcomponent.detector_integer_id; import static slo_violation_detector_engine.detector.DetectorSubcomponent.detector_subcomponents; -import static utilities.DebugDataSubscription.debug_data_generation; +import static utilities.DebugDataSubscription.*; import static utility_beans.CharacterizedThread.CharacterizedThreadRunMode.detached; @RestController @RequestMapping("/api") @@ -17,7 +17,7 @@ public class DetectorRequestMappings { @RequestMapping("/add-new-detector") public static String start_new_detector_subcomponent() { - detectors.add(new DetectorSubcomponent(default_application_name,detached)); + detectors.put(default_application_name,new DetectorSubcomponent(default_application_name,detached)); return ("Spawned new SLO Detector subcomponent instance! Currently, there have been "+detector_integer_id+" detectors spawned"); } @@ -30,7 +30,7 @@ public class DetectorRequestMappings { @GetMapping("/component-statistics/detectors/{id}") public static String get_detector_subcomponent_statistics(@PathVariable String id) { String detector_name = "detector_"+id; - debug_data_generation.apply(detector_subcomponents.get(detector_name).getBrokerSubscriptionDetails(),EMPTY); + debug_data_generation.apply(detector_subcomponents.get(detector_name).getBrokerSubscriptionDetails(debug_data_trigger_topic_name),EMPTY); return DetectorSubcomponent.get_detector_subcomponent_statistics(); } } diff --git a/slo-violation-detector/src/main/java/runtime/DirectorRequestMappings.java b/slo-violation-detector/src/main/java/runtime/DirectorRequestMappings.java index dfac1d9..95e9a7d 100644 --- a/slo-violation-detector/src/main/java/runtime/DirectorRequestMappings.java +++ b/slo-violation-detector/src/main/java/runtime/DirectorRequestMappings.java @@ -10,13 +10,16 @@ import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import slo_violation_detector_engine.detector.DetectorSubcomponent; +import slo_violation_detector_engine.director.DirectorSubcomponent; +import utility_beans.BrokerSubscriptionDetails; import utility_beans.CharacterizedThread; import utility_beans.RealtimeMonitoringAttribute; import java.util.HashMap; -import static configuration.Constants.default_application_name; +import static configuration.Constants.*; import static org.springframework.http.MediaType.APPLICATION_JSON_VALUE; +import static slo_violation_detector_engine.generic.ComponentState.*; @RestController @RequestMapping("/api") @@ -32,9 +35,8 @@ public class DirectorRequestMappings { } catch (ParseException e) { return "Error in parsing the input string, the exception message follows:\n"+e; } - application_name = (String) rule_representation_json.get("name"); - DetectorSubcomponent new_detector = DetectorSubcomponent.detector_subcomponents.getOrDefault(application_name,new DetectorSubcomponent(default_application_name,CharacterizedThread.CharacterizedThreadRunMode.detached)); - new_detector.slo_rule_topic_subscriber_function.apply(Constants.slo_rules_topic,string_rule_representation); + BrokerSubscriptionDetails broker_details = new BrokerSubscriptionDetails(broker_ip,broker_username,broker_password,nebulous_components_application,slo_rules_topic); + DirectorSubcomponent.slo_rule_topic_subscriber_function.apply(broker_details,string_rule_representation); return ("New application was spawned"); } @@ -58,19 +60,39 @@ public class DirectorRequestMappings { for (Object metric : metrics_json_array){ JSONObject metric_json = (JSONObject) metric; String metric_name = (String) metric_json.get("name"); + + RealtimeMonitoringAttribute.AttributeValuesType lower_bound_type,upper_bound_type; + double upper_bound = 100.0,lower_bound = 0.0; - if (((String) metric_json.get("upper_bound")).toLowerCase().contains("-inf")){ + if (((String) metric_json.get("upper_bound")).toLowerCase().contains("-inf") || ((String) metric_json.get("upper_bound")).toLowerCase().contains("-infinity")){ upper_bound = Double.NEGATIVE_INFINITY; - }else if (((String) metric_json.get("upper_bound")).toLowerCase().contains("inf")){ + }else if (((String) metric_json.get("upper_bound")).toLowerCase().contains("inf") || ((String) metric_json.get("upper_bound")).toLowerCase().contains("infinity")){ upper_bound = Double.NEGATIVE_INFINITY; + }else{ + String upper_bound_str = (String) metric_json.get("upper_bound"); + try { + upper_bound = Integer.parseInt(upper_bound_str); + upper_bound_type = RealtimeMonitoringAttribute.AttributeValuesType.Integer; + }catch (Exception e){ + try{ + upper_bound = Double.parseDouble(upper_bound_str); + upper_bound_type = RealtimeMonitoringAttribute.AttributeValuesType.Double; + }catch (Exception z){ + e.printStackTrace(); + z.printStackTrace(); + } + } } - if (((String) metric_json.get("lower_bound")).toLowerCase().contains("-inf")){ + + + if (((String) metric_json.get("lower_bound")).toLowerCase().contains("-inf") || ((String) metric_json.get("lower_bound")).toLowerCase().contains("-infinity")){ lower_bound = Double.POSITIVE_INFINITY; } - else if (((String) metric_json.get("lower_bound")).toLowerCase().contains("inf")){ + else if (((String) metric_json.get("lower_bound")).toLowerCase().contains("inf") || ((String) metric_json.get("lower_bound")).toLowerCase().contains("infinity")){ lower_bound = Double.POSITIVE_INFINITY; + }else { + application_metrics.put(metric_name, new RealtimeMonitoringAttribute(metric_name, lower_bound, upper_bound, RealtimeMonitoringAttribute.AttributeValuesType.Double)); } - application_metrics.put(metric_name,new RealtimeMonitoringAttribute(metric_name,lower_bound,upper_bound)); } RealtimeMonitoringAttribute.initialize_monitoring_attributes(new_detector,application_metrics); diff --git a/slo-violation-detector/src/main/java/runtime/Main.java b/slo-violation-detector/src/main/java/runtime/Main.java index 86c5b11..f9bd2c2 100644 --- a/slo-violation-detector/src/main/java/runtime/Main.java +++ b/slo-violation-detector/src/main/java/runtime/Main.java @@ -14,6 +14,7 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import slo_violation_detector_engine.detector.DetectorSubcomponent; import slo_violation_detector_engine.director.DirectorSubcomponent; +import slo_violation_detector_engine.generic.ComponentState.*; import utility_beans.*; import java.io.IOException; @@ -24,6 +25,7 @@ import java.util.logging.Logger; import static configuration.Constants.*; import static java.util.logging.Level.INFO; +import static slo_violation_detector_engine.generic.ComponentState.*; import static utilities.OperationalModeUtils.getSLOViolationDetectionOperationalMode; import static slo_violation_detector_engine.generic.SLOViolationDetectorStateUtils.*; import static utilities.OperationalModeUtils.get_director_subscription_topics; @@ -33,7 +35,7 @@ import static utility_beans.CharacterizedThread.CharacterizedThreadRunMode.detac @SpringBootApplication public class Main { public static Long current_slo_rules_version = -1L;//initialization - public static ArrayList detectors = new ArrayList<>(); + public static HashMap detectors = new HashMap<>(); public static void main(String[] args) { //The input of this program (the SLO Violation Detector) is the type of the SLO violations which are monitored, and the predicted metric values which are evaluated. Its output are events which produce an estimate of the probability of an adaptation. @@ -72,18 +74,23 @@ public class Main { slo_violation_probability_threshold = Double.parseDouble(prop.getProperty("slo_violation_probability_threshold")); slo_violation_determination_method = prop.getProperty("slo_violation_determination_method"); maximum_acceptable_forward_predictions = Integer.parseInt(prop.getProperty("maximum_acceptable_forward_predictions")); + + broker_ip = prop.getProperty("broker_ip_url"); + broker_username = prop.getProperty("broker_username"); + broker_password = prop.getProperty("broker_password"); + //director_subscription_topics = get_director_subscription_topics(); DetectorSubcomponent detector = new DetectorSubcomponent(default_application_name,detached); - detectors.add(detector); + detectors.put(default_application_name,detector); ArrayList unbounded_metric_strings = new ArrayList<>(Arrays.asList(prop.getProperty("metrics_bounds").split(","))); for (String metric_string : unbounded_metric_strings) { detector.getSubcomponent_state().getMonitoring_attributes_bounds_representation().put(metric_string.split(";")[0], metric_string.split(";", 2)[1]); //TODO delete once this information is successfully received from the AMQP broker } } //initialization if (operational_mode.equals(OperationalMode.DETECTOR)) { - Logger.getGlobal().log(INFO,"Starting new Detector instance"); //This detector instance has been already started in the initialization block above as it will be commonly needed both for the plain Detector and the Director-Detector + Logger.getGlobal().log(INFO,"Started new Detector instance"); //This detector instance has been already started in the initialization block above as it will be commonly needed both for the plain Detector and the Director-Detector }else if (operational_mode.equals(OperationalMode.DIRECTOR)){ - Logger.getGlobal().log(INFO,"Starting new Director and new Detector instance"); + Logger.getGlobal().log(INFO,"Starting new Director along the new Detector instance"); DirectorSubcomponent director = new DirectorSubcomponent(); SpringApplication.run(Main.class, args); Logger.getGlobal().log(INFO,"Execution completed"); diff --git a/slo-violation-detector/src/main/java/slo_rule_modelling/SLORule.java b/slo-violation-detector/src/main/java/slo_rule_modelling/SLORule.java index 1ad69ef..ff0f124 100644 --- a/slo-violation-detector/src/main/java/slo_rule_modelling/SLORule.java +++ b/slo-violation-detector/src/main/java/slo_rule_modelling/SLORule.java @@ -20,6 +20,7 @@ import utility_beans.PredictedMonitoringAttribute; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.logging.Level; import java.util.logging.Logger; @@ -63,7 +64,7 @@ public class SLORule { for (String metric: metric_list) { RealtimeMonitoringAttribute monitoring_attribute; if (!associated_detector.getSubcomponent_state().getMonitoring_attributes().containsKey(metric)){ - monitoring_attribute = new RealtimeMonitoringAttribute(metric); + monitoring_attribute = new RealtimeMonitoringAttribute(metric,false, RealtimeMonitoringAttribute.AttributeValuesType.Unknown); associated_detector.getSubcomponent_state().getMonitoring_attributes().put(metric,monitoring_attribute); } monitoring_attributes.add(metric); @@ -199,7 +200,7 @@ public class SLORule { //subrules_json_array.add } if (composite_rule){ - ArrayList individual_severity_contributions = new ArrayList<>(); + ArrayList individual_severity_contributions = new ArrayList<>(); boolean and_subrules_invalidated = false; for (Object subrule: subrules_json_array) { JSONObject json_subrule = (JSONObject) subrule; @@ -239,7 +240,7 @@ public class SLORule { rule_result_value = MathUtils.get_average(individual_severity_contributions); calculation_logging_string.append("Calculating average of individual severity contributions: ").append(individual_severity_contributions).append(" equals ").append(rule_result_value).append("\n"); }else if (slo_violation_determination_method.equals("prconf-delta") && individual_severity_contributions.size()>0){ - rule_result_value = Math.sqrt(MathUtils.sum(individual_severity_contributions.stream().map(x->x*x).collect(Collectors.toList())))/Math.sqrt(individual_severity_contributions.size()); + rule_result_value = Math.sqrt(MathUtils.sum(individual_severity_contributions.stream().map(x->x.doubleValue()*x.doubleValue()).collect(Collectors.toList())))/Math.sqrt(individual_severity_contributions.size()); calculation_logging_string.append("Calculating square root of sum of individual severity contributions: ").append(individual_severity_contributions).append(" - the result is ").append(rule_result_value).append("\n"); } diff --git a/slo-violation-detector/src/main/java/slo_violation_detector_engine/detector/DetectorSubcomponent.java b/slo-violation-detector/src/main/java/slo_violation_detector_engine/detector/DetectorSubcomponent.java index ce37971..8d192df 100644 --- a/slo-violation-detector/src/main/java/slo_violation_detector_engine/detector/DetectorSubcomponent.java +++ b/slo-violation-detector/src/main/java/slo_violation_detector_engine/detector/DetectorSubcomponent.java @@ -1,27 +1,26 @@ package slo_violation_detector_engine.detector; -import org.json.simple.JSONArray; -import org.json.simple.JSONObject; -import org.json.simple.parser.JSONParser; +import org.apache.commons.collections4.queue.CircularFifoQueue; import slo_violation_detector_engine.generic.Runnables; import slo_violation_detector_engine.generic.SLOViolationDetectorSubcomponent; import utility_beans.*; -import java.time.Clock; +import java.util.Collections; import java.util.HashMap; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.BiFunction; import java.util.logging.Logger; import static configuration.Constants.*; -import static slo_violation_detector_engine.generic.SLOViolationDetectorStateUtils.*; +import static slo_violation_detector_engine.generic.ComponentState.prop; import static utility_beans.CharacterizedThread.CharacterizedThreadRunMode.attached; +import static utility_beans.RealtimeMonitoringAttribute.aggregate_metric_values; public class DetectorSubcomponent extends SLOViolationDetectorSubcomponent { public static final SynchronizedInteger detector_integer_id = new SynchronizedInteger(); - public static HashMap detector_subcomponents = new HashMap<>(); //A HashMap containing all detector subcomponents + public static Map detector_subcomponents = Collections.synchronizedMap(new HashMap<>()); //A HashMap containing all detector subcomponents private DetectorSubcomponentState subcomponent_state; public final AtomicBoolean stop_signal = new AtomicBoolean(false); public final SynchronizedBoolean can_modify_slo_rules = new SynchronizedBoolean(false); @@ -35,6 +34,7 @@ public class DetectorSubcomponent extends SLOViolationDetectorSubcomponent { public Long last_processed_adaptation_time = -1L;//initialization private String detector_name; + private String handled_application_name; private static String broker_ip = prop.getProperty("broker_ip_url"); private static String broker_username = prop.getProperty("broker_username"); private static String broker_password = prop.getProperty("broker_password"); @@ -53,7 +53,8 @@ public class DetectorSubcomponent extends SLOViolationDetectorSubcomponent { detector_integer_id.setValue(detector_integer_id.getValue()+1); current_detector_id = detector_integer_id.getValue(); //detector_integer_id.notify(); - detector_name = "detector_"+current_detector_id; + handled_application_name = application_name; + detector_name = "detector_"+application_name+"_"+current_detector_id; } if (characterized_thread_run_mode.equals(attached)) { DetectorSubcomponentUtilities.run_slo_violation_detection_engine(this); @@ -63,75 +64,43 @@ public class DetectorSubcomponent extends SLOViolationDetectorSubcomponent { detector_subcomponents.put(detector_name,this); } - public BiFunction slo_rule_topic_subscriber_function = (topic, message) -> { - synchronized (can_modify_slo_rules) { - can_modify_slo_rules.setValue(true); - MESSAGE_CONTENTS.assign_value(topic, message); - slo_rule_arrived.set(true); - can_modify_slo_rules.notifyAll(); + public void update_monitoring_attribute_value(String name,Number value){ + if(getSubcomponent_state().getMonitoring_attributes().get(name)==null){ - Logger.getGlobal().log(info_logging_level, "BrokerClientApp: - Received text message: " + message + " at topic " + topic); - - } - return topic + ":MSG:" + message; - }; - - public BiFunction metric_list_subscriber_function = (topic, message) -> { - synchronized (can_modify_monitoring_metrics) { - can_modify_monitoring_metrics.setValue(true); - MESSAGE_CONTENTS.assign_value(topic, message); - //TODO add monitoring metrics bounds - String metric_name; - double lower_bound,upper_bound; - JSONParser parser = new JSONParser(); - JSONObject metric_list_object; - try { - metric_list_object = (JSONObject) parser.parse(message); - for (Object element : (JSONArray) metric_list_object.get("metric_list")){ - metric_name = (String)((JSONObject)element).get("name"); - String lower_bound_str = (String)((JSONObject)element).get("lower_bound"); - String upper_bound_str = (String)((JSONObject)element).get("upper_bound"); - if (!(lower_bound_str.toLowerCase().equals("-inf") || lower_bound_str.toLowerCase().equals("-infinity"))){ - lower_bound = Double.parseDouble(lower_bound_str); - }else{ - lower_bound = Double.NEGATIVE_INFINITY; - } - - if (!(upper_bound_str.toLowerCase().equals("inf") || upper_bound_str.toLowerCase().equals("infinity"))){ - upper_bound = Double.parseDouble(upper_bound_str); - }else{ - upper_bound = Double.POSITIVE_INFINITY; - } - - subcomponent_state.getMonitoring_attributes().put(metric_name,new RealtimeMonitoringAttribute(metric_name,lower_bound,upper_bound)); - } - }catch (Exception e){ - e.printStackTrace(); + RealtimeMonitoringAttribute.AttributeValuesType attribute_type; + if (value instanceof Integer){ + attribute_type = RealtimeMonitoringAttribute.AttributeValuesType.Integer; + }else if (value instanceof Double){ + attribute_type = RealtimeMonitoringAttribute.AttributeValuesType.Double; + }else{ + attribute_type = RealtimeMonitoringAttribute.AttributeValuesType.Unknown; } + getSubcomponent_state().getMonitoring_attributes().put(name,new RealtimeMonitoringAttribute(name,false,attribute_type)); + //monitoring_attributes_max_values.put(name,value); + //monitoring_attributes_min_values.put(name,value); - //slo_rule_arrived.set(true); - can_modify_monitoring_metrics.notifyAll(); - - Logger.getGlobal().log(info_logging_level, "BrokerClientApp: - Received text message: " + message + " at topic " + topic); - + }else if (getSubcomponent_state().getMonitoring_attributes().get(name).getType()==RealtimeMonitoringAttribute.AttributeValuesType.Unknown){ + RealtimeMonitoringAttribute.AttributeValuesType attribute_type; + if (value instanceof Integer){ + attribute_type = RealtimeMonitoringAttribute.AttributeValuesType.Integer; + }else if (value instanceof Double){ + attribute_type = RealtimeMonitoringAttribute.AttributeValuesType.Double; + }else{ + attribute_type = RealtimeMonitoringAttribute.AttributeValuesType.Unknown; + } + getSubcomponent_state().getMonitoring_attributes().get(name).setType(attribute_type); } - return "Monitoring metrics message processed"; - }; - public static BrokerSubscriber device_lost_subscriber = new BrokerSubscriber(topic_for_lost_device_announcement, broker_ip, broker_username, broker_password, amq_library_configuration_location); - public static BiFunction device_lost_subscriber_function = (topic, message) -> { - BrokerPublisher persistent_publisher = new BrokerPublisher(topic_for_severity_announcement, broker_ip, broker_username, broker_password, amq_library_configuration_location); - Clock clock = Clock.systemUTC(); - Long current_time_seconds = (long) Math.floor(clock.millis()/1000.0); - JSONObject severity_json = new JSONObject(); - severity_json.put("severity", 100); - severity_json.put("probability", 100); - severity_json.put("predictionTime", current_time_seconds); - persistent_publisher.publish(severity_json.toJSONString()); - - //TODO possibly necessary to remove the next adaptation time as it will probably not be possible to start an adaptation during it - return topic + ":MSG:" + message; - }; + getSubcomponent_state().getMonitoring_attributes().get(name).getActual_metric_values().add(value); + getSubcomponent_state().getMonitoring_attributes_statistics().get(name).update_attribute_statistics(value); + /* + if(get_90th_percentile_high_value(name,value)>monitoring_attributes_max_values.get(name)){ + monitoring_attributes_max_values.put(name,value); + }else if (get_90th_percentile_low_value(name,value) actual_metric_values = getSubcomponent_state().getMonitoring_attributes().get(metric_name).getActual_metric_values(); + if (actual_metric_values.size()==0){ + Logger.getGlobal().log(warning_logging_level,"Trying to retrieve realtime values from an empty queue for metric "+metric_name); + } + return aggregate_metric_values(actual_metric_values); + } + + public static DetectorSubcomponent get_associated_detector(String application_name){ + DetectorSubcomponent associated_detector = detector_subcomponents.get(application_name); + if (associated_detector==null){ + if (detector_subcomponents.size()==1 && detector_subcomponents.get(default_application_name)!=null){//This means only the initial 'default' application exists + associated_detector = detector_subcomponents.get(default_application_name); + associated_detector.set_name(application_name); + } + else { + associated_detector = new DetectorSubcomponent(application_name, CharacterizedThread.CharacterizedThreadRunMode.detached); + } + } + return associated_detector; + } } diff --git a/slo-violation-detector/src/main/java/slo_violation_detector_engine/detector/DetectorSubcomponentState.java b/slo-violation-detector/src/main/java/slo_violation_detector_engine/detector/DetectorSubcomponentState.java index 754e06b..9b4239b 100644 --- a/slo-violation-detector/src/main/java/slo_violation_detector_engine/detector/DetectorSubcomponentState.java +++ b/slo-violation-detector/src/main/java/slo_violation_detector_engine/detector/DetectorSubcomponentState.java @@ -11,7 +11,7 @@ import java.util.HashSet; import static configuration.Constants.kept_values_per_metric; -public class DetectorSubcomponentState { +public class DetectorSubcomponentState{ private HashMap monitoring_attributes_statistics = new HashMap<>(); private HashMap monitoring_attributes_roc_statistics = new HashMap<>(); private HashMap monitoring_attributes = new HashMap<>(); diff --git a/slo-violation-detector/src/main/java/slo_violation_detector_engine/detector/DetectorSubcomponentUtilities.java b/slo-violation-detector/src/main/java/slo_violation_detector_engine/detector/DetectorSubcomponentUtilities.java index a86ebce..0eacd37 100644 --- a/slo-violation-detector/src/main/java/slo_violation_detector_engine/detector/DetectorSubcomponentUtilities.java +++ b/slo-violation-detector/src/main/java/slo_violation_detector_engine/detector/DetectorSubcomponentUtilities.java @@ -1,5 +1,6 @@ package slo_violation_detector_engine.detector; +import groovy.util.logging.Log; import metric_retrieval.AttributeSubscription; import org.json.simple.JSONArray; import org.json.simple.JSONObject; @@ -23,8 +24,8 @@ import java.util.logging.Logger; import java.util.stream.Collectors; import static configuration.Constants.*; -import static slo_violation_detector_engine.detector.DetectorSubcomponent.device_lost_subscriber; -import static slo_violation_detector_engine.detector.DetectorSubcomponent.device_lost_subscriber_function; +import static slo_violation_detector_engine.director.DirectorSubcomponent.MESSAGE_CONTENTS; +import static slo_violation_detector_engine.generic.ComponentState.prop; import static slo_violation_detector_engine.generic.Runnables.get_severity_calculation_runnable; import static runtime.Main.*; import static slo_violation_detector_engine.generic.SLOViolationDetectorStateUtils.*; @@ -261,105 +262,18 @@ public class DetectorSubcomponentUtilities { public static void run_slo_violation_detection_engine(DetectorSubcomponent associated_detector_subcomponent) { while (true) { - if (first_run){ - //Creation of threads that should always run and are independent of the monitored application. - //1. Creation of the metric list input subscriber thread, which listens for the metrics to be considered - //2. Creation of the slo rule input subscriber thread, which listens for new slo rules to be considered - //3. Creation of the lost device subscriber thread, which listens for a new event signalling a lost edge device - - //Metric list subscription thread - BrokerSubscriber metric_list_subscriber = new BrokerSubscriber(metric_list_topic, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password"), amq_library_configuration_location); - Runnable metric_list_topic_subscriber_runnable = () -> { - boolean did_not_finish_execution_gracefully = true; - while (did_not_finish_execution_gracefully) { - int exit_status = metric_list_subscriber.subscribe(associated_detector_subcomponent.metric_list_subscriber_function, associated_detector_subcomponent.stop_signal); //This subscriber should not be immune to stop signals - if (exit_status!=0) { - Logger.getGlobal().log(info_logging_level,"Broker unavailable, will try to reconnect after 10 seconds"); - try { - Thread.sleep(10000); - } catch (InterruptedException i) { - Logger.getGlobal().log(info_logging_level, "Sleep was interrupted, will immediately try to connect to the broker"); - } - }else{ - did_not_finish_execution_gracefully = false; - } - } - associated_detector_subcomponent.getSubcomponent_state().slo_bound_running_threads.remove(Thread.currentThread().getName().split(NAME_SEPARATOR)[0]); - }; - CharacterizedThread.create_new_thread(metric_list_topic_subscriber_runnable,"metric_list_topic_subscriber_thread",true,associated_detector_subcomponent, CharacterizedThread.CharacterizedThreadType.slo_bound_running_thread); - - - - //SLO rule subscription thread - BrokerSubscriber slo_rule_topic_subscriber = new BrokerSubscriber(slo_rules_topic, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password"), amq_library_configuration_location); - Runnable slo_rules_topic_subscriber_runnable = () -> { - boolean did_not_finish_execution_gracefully = true; - while (did_not_finish_execution_gracefully) { - int exit_status = slo_rule_topic_subscriber.subscribe(associated_detector_subcomponent.slo_rule_topic_subscriber_function, associated_detector_subcomponent.stop_signal); //This subscriber should not be immune to stop signals - if (exit_status!=0) { - Logger.getGlobal().log(info_logging_level, "Broker unavailable, will try to reconnect after 10 seconds"); - try { - Thread.sleep(10000); - } catch (InterruptedException i) { - Logger.getGlobal().log(info_logging_level, "Sleep was interrupted, will immediately try to connect to the broker"); - } - }else{ - did_not_finish_execution_gracefully = false; - } - } - associated_detector_subcomponent.getSubcomponent_state().slo_bound_running_threads.remove(Thread.currentThread().getName().split(NAME_SEPARATOR)[0]); - }; - CharacterizedThread.create_new_thread(slo_rules_topic_subscriber_runnable,"slo_rules_topic_subscriber_thread",true,associated_detector_subcomponent, CharacterizedThread.CharacterizedThreadType.persistent_running_detector_thread); - - - //Implementation of 'Lost edge device' thread - - Runnable device_lost_topic_subscriber_runnable = () -> { - boolean did_not_finish_execution_gracefully = true; - while (did_not_finish_execution_gracefully) { - int exit_status = device_lost_subscriber.subscribe(device_lost_subscriber_function, associated_detector_subcomponent.stop_signal); //This subscriber should not be immune to stop signals, else there would be new AtomicBoolean(false) - if (exit_status!=0) { - Logger.getGlobal().log(info_logging_level, "A device used by the platform was lost, will therefore trigger a reconfiguration"); - try { - Thread.sleep(10000); - } catch (InterruptedException i) { - Logger.getGlobal().log(info_logging_level, "Sleep was interrupted, will immediately try to connect to the broker"); - } - }else{ - did_not_finish_execution_gracefully = false; - } - } - associated_detector_subcomponent.getSubcomponent_state().slo_bound_running_threads.remove(Thread.currentThread().getName().split(NAME_SEPARATOR)[0]); - }; - - CharacterizedThread.create_new_thread(device_lost_topic_subscriber_runnable,"device_lost_topic_subscriber_thread",true,associated_detector_subcomponent, CharacterizedThread.CharacterizedThreadType.slo_bound_running_thread); - - - if (self_publish_rule_file) { - String json_file_name = prop.getProperty("input_file"); - String rules_json_string = null; - try { - rules_json_string = String.join(EMPTY, Files.readAllLines(Paths.get(new File(json_file_name).getAbsolutePath()))); - } catch (IOException e) { - throw new RuntimeException(e); - } - BrokerPublisher publisher = new BrokerPublisher(slo_rules_topic, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password"), amq_library_configuration_location); - publisher.publish(rules_json_string); - Logger.getGlobal().log(info_logging_level, "Sent message\n" + rules_json_string); - } - } - first_run = false; + Logger.getGlobal().log(info_logging_level,"Initializing new slo violation detection engine for "+associated_detector_subcomponent.get_name()); synchronized (associated_detector_subcomponent.can_modify_slo_rules) { - do { + while((!associated_detector_subcomponent.can_modify_slo_rules.getValue()) || (!associated_detector_subcomponent.slo_rule_arrived.get())){ try { associated_detector_subcomponent.can_modify_slo_rules.wait(); }catch (InterruptedException i){ i.printStackTrace(); } - }while((!associated_detector_subcomponent.can_modify_slo_rules.getValue()) || (!associated_detector_subcomponent.slo_rule_arrived.get())); + } associated_detector_subcomponent.can_modify_slo_rules.setValue(false); associated_detector_subcomponent.slo_rule_arrived.set(false); - String rule_representation = MESSAGE_CONTENTS.get_synchronized_contents(slo_rules_topic); + String rule_representation = MESSAGE_CONTENTS.get_synchronized_contents(associated_detector_subcomponent.get_application_name(),slo_rules_topic); if (slo_rule_arrived_has_updated_version(rule_representation)) { if (single_slo_rule_active) { associated_detector_subcomponent.getSubcomponent_state().slo_rules.clear(); @@ -378,8 +292,8 @@ public class DetectorSubcomponentUtilities { continue; } } - stop_all_running_threads(associated_detector_subcomponent); + Logger.getGlobal().log(info_logging_level,"Initializing debug instance for detector component "+associated_detector_subcomponent.get_name()); DebugDataSubscription.initiate(prop.getProperty("broker_ip_url"),prop.getProperty("broker_username"), prop.getProperty("broker_password"),associated_detector_subcomponent); initialize_monitoring_datastructures_with_empty_data(associated_detector_subcomponent.getSubcomponent_state().slo_rules); // diff --git a/slo-violation-detector/src/main/java/slo_violation_detector_engine/director/DirectorSubcomponent.java b/slo-violation-detector/src/main/java/slo_violation_detector_engine/director/DirectorSubcomponent.java index 68b648d..4a312d9 100644 --- a/slo-violation-detector/src/main/java/slo_violation_detector_engine/director/DirectorSubcomponent.java +++ b/slo-violation-detector/src/main/java/slo_violation_detector_engine/director/DirectorSubcomponent.java @@ -1,11 +1,28 @@ package slo_violation_detector_engine.director; import eu.nebulouscloud.exn.Connector; +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; +import slo_violation_detector_engine.detector.DetectorSubcomponent; import slo_violation_detector_engine.generic.SLOViolationDetectorSubcomponent; -import utility_beans.CharacterizedThread; +import utility_beans.*; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.text.NumberFormat; +import java.time.Clock; import java.util.HashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiFunction; +import java.util.logging.Logger; +import static configuration.Constants.*; +import static slo_violation_detector_engine.detector.DetectorSubcomponent.detector_subcomponents; +import static slo_violation_detector_engine.detector.DetectorSubcomponent.get_associated_detector; +import static slo_violation_detector_engine.generic.ComponentState.*; import static utilities.OperationalModeUtils.get_director_publishing_topics; import static utilities.OperationalModeUtils.get_director_subscription_topics; @@ -15,6 +32,11 @@ public class DirectorSubcomponent extends SLOViolationDetectorSubcomponent { private Integer id = 1; public static HashMap director_subcomponents = new HashMap<>(); private static DirectorSubcomponent master_director; + public static boolean first_run = true; + public final AtomicBoolean stop_signal = new AtomicBoolean(false); + public static SynchronizedStringMap MESSAGE_CONTENTS = new SynchronizedStringMap(); + public final SynchronizedBoolean can_modify_monitoring_metrics = new SynchronizedBoolean(false); + private String director_name; public static DirectorSubcomponent getMaster_director() { @@ -35,6 +57,115 @@ public class DirectorSubcomponent extends SLOViolationDetectorSubcomponent { } private void create_director_topic_subscribers(){ + if (first_run){ + //Creation of threads that should always run and are independent of the monitored application. + //1. Creation of the metric list input subscriber thread, which listens for the metrics to be considered + //2. Creation of the slo rule input subscriber thread, which listens for new slo rules to be considered + //3. Creation of the lost device subscriber thread, which listens for a new event signalling a lost edge device + + //Metric list subscription thread + BrokerSubscriber metric_list_subscriber = new BrokerSubscriber(metric_list_topic, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password"), amq_library_configuration_location,nebulous_components_application); + Runnable metric_list_topic_subscriber_runnable = () -> { + boolean did_not_finish_execution_gracefully = true; + while (did_not_finish_execution_gracefully) { + int exit_status = metric_list_subscriber.subscribe(metric_list_subscriber_function, this.stop_signal); //This subscriber should not be immune to stop signals + if (exit_status!=0) { + Logger.getGlobal().log(info_logging_level,"Broker unavailable, will try to reconnect after 10 seconds"); + try { + Thread.sleep(10000); + } catch (InterruptedException i) { + Logger.getGlobal().log(info_logging_level, "Sleep was interrupted, will immediately try to connect to the broker"); + } + }else{ + did_not_finish_execution_gracefully = false; + } + } + persistent_running_director_threads.remove(Thread.currentThread().getName().split(NAME_SEPARATOR)[0]); + }; + CharacterizedThread.create_new_thread(metric_list_topic_subscriber_runnable,"metric_list_topic_subscriber_thread",true,this, CharacterizedThread.CharacterizedThreadType.persistent_running_director_thread); + + + + //SLO rule subscription thread + BrokerSubscriber slo_rule_topic_subscriber = new BrokerSubscriber(slo_rules_topic, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password"), amq_library_configuration_location,nebulous_components_application); + Runnable slo_rules_topic_subscriber_runnable = () -> { + boolean did_not_finish_execution_gracefully = true; + while (did_not_finish_execution_gracefully) { + int exit_status = slo_rule_topic_subscriber.subscribe(slo_rule_topic_subscriber_function, stop_signal); //This subscriber should not be immune to stop signals + if (exit_status!=0) { + Logger.getGlobal().log(info_logging_level, "Broker unavailable, will try to reconnect after 10 seconds"); + try { + Thread.sleep(10000); + } catch (InterruptedException i) { + Logger.getGlobal().log(info_logging_level, "Sleep was interrupted, will immediately try to connect to the broker"); + } + }else{ + did_not_finish_execution_gracefully = false; + } + } + persistent_running_director_threads.remove(Thread.currentThread().getName().split(NAME_SEPARATOR)[0]); + }; + CharacterizedThread.create_new_thread(slo_rules_topic_subscriber_runnable,"slo_rules_topic_subscriber_thread",true,this, CharacterizedThread.CharacterizedThreadType.persistent_running_director_thread); + + + + + + BrokerSubscriber device_lost_subscriber = new BrokerSubscriber(topic_for_lost_device_announcement, broker_ip, broker_username, broker_password, amq_library_configuration_location,nebulous_components_application); + BiFunction device_lost_subscriber_function = (broker_details, message) -> { + BrokerPublisher persistent_publisher = new BrokerPublisher(topic_for_severity_announcement, broker_ip, broker_username, broker_password, amq_library_configuration_location); + + Clock clock = Clock.systemUTC(); + Long current_time_seconds = (long) Math.floor(clock.millis()/1000.0); + JSONObject severity_json = new JSONObject(); + severity_json.put("severity", 100); + severity_json.put("probability", 100); + severity_json.put("predictionTime", current_time_seconds); + persistent_publisher.publish(severity_json.toJSONString()); + + return topic_for_lost_device_announcement + ":MSG:" + message; + }; + + + //Implementation of 'Lost edge device' thread + + Runnable device_lost_topic_subscriber_runnable = () -> { + boolean did_not_finish_execution_gracefully = true; + while (did_not_finish_execution_gracefully) { + int exit_status = device_lost_subscriber.subscribe(device_lost_subscriber_function, stop_signal); //This subscriber should not be immune to stop signals, else there would be new AtomicBoolean(false) + if (exit_status!=0) { + Logger.getGlobal().log(info_logging_level, "A device used by the platform was lost, will therefore trigger a reconfiguration"); + try { + Thread.sleep(10000); + } catch (InterruptedException i) { + Logger.getGlobal().log(info_logging_level, "Sleep was interrupted, will immediately try to connect to the broker"); + } + }else{ + did_not_finish_execution_gracefully = false; + } + } + persistent_running_director_threads.remove(Thread.currentThread().getName().split(NAME_SEPARATOR)[0]); + }; + + CharacterizedThread.create_new_thread(device_lost_topic_subscriber_runnable,"device_lost_topic_subscriber_thread",true,this, CharacterizedThread.CharacterizedThreadType.persistent_running_director_thread); + + + if (self_publish_rule_file) { + String json_file_name = prop.getProperty("input_file"); + String rules_json_string = null; + try { + rules_json_string = String.join(EMPTY, Files.readAllLines(Paths.get(new File(json_file_name).getAbsolutePath()))); + } catch (IOException e) { + throw new RuntimeException(e); + } + BrokerPublisher publisher = new BrokerPublisher(slo_rules_topic, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password"), amq_library_configuration_location); + publisher.publish(rules_json_string); + Logger.getGlobal().log(info_logging_level, "Sent message\n" + rules_json_string); + } + } + first_run = false; + + for (String subscription_topic : get_director_subscription_topics()){ //TODO subscribe to each topic, creating a Characterized thread for each of them @@ -45,6 +176,93 @@ public class DirectorSubcomponent extends SLOViolationDetectorSubcomponent { //subscribing_connector = new Connector("slovid_director",) } + public BiFunction metric_list_subscriber_function = (broker_details, message) -> { + DetectorSubcomponent associated_detector = get_associated_detector(broker_details.getApplication_name()); + synchronized (can_modify_monitoring_metrics) { + can_modify_monitoring_metrics.setValue(true); + MESSAGE_CONTENTS.assign_value(broker_details.getApplication_name(),metric_list_topic, message); + String metric_name; + double lower_bound,upper_bound; + JSONParser parser = new JSONParser(); + JSONObject metric_list_object; + try { + metric_list_object = (JSONObject) parser.parse(message); + for (Object element : (JSONArray) metric_list_object.get("metric_list")){ + metric_name = (String)((JSONObject)element).get("name"); + String lower_bound_str = (String)((JSONObject)element).get("lower_bound"); + String upper_bound_str = (String)((JSONObject)element).get("upper_bound"); + NumberFormat numberFormat = NumberFormat.getInstance(); + Number lower_bound_number, upper_bound_number; + boolean is_lower_bound_integer=false,is_lower_bound_double=false; + boolean is_upper_bound_integer = false,is_upper_bound_double=false; + if (!(lower_bound_str.equalsIgnoreCase("-inf") || lower_bound_str.equalsIgnoreCase("-infinity"))){ + lower_bound_number = numberFormat.parse(lower_bound_str); + if (lower_bound_number instanceof Integer){ + is_lower_bound_integer = true; + is_lower_bound_double = false; + }else if (lower_bound_number instanceof Double){ + is_lower_bound_double = true; + is_lower_bound_integer = false; + } + lower_bound = lower_bound_number.doubleValue(); + }else{ + lower_bound = Double.NEGATIVE_INFINITY; + } + + if (!(upper_bound_str.equalsIgnoreCase("inf") || upper_bound_str.equalsIgnoreCase("infinity"))){ + upper_bound_number = numberFormat.parse(upper_bound_str); + if (upper_bound_number instanceof Integer){ + is_upper_bound_integer = true; + is_upper_bound_double = false; + }else if (upper_bound_number instanceof Double){ + is_upper_bound_double = true; + is_upper_bound_integer = false; + } + upper_bound = upper_bound_number.doubleValue(); + }else{ + upper_bound = Double.POSITIVE_INFINITY; + } + RealtimeMonitoringAttribute.AttributeValuesType attribute_type; + if (is_upper_bound_integer && is_lower_bound_integer){ + attribute_type = RealtimeMonitoringAttribute.AttributeValuesType.Integer; + }else if (is_lower_bound_double || is_upper_bound_double){ + attribute_type = RealtimeMonitoringAttribute.AttributeValuesType.Double; + }else{ + attribute_type = RealtimeMonitoringAttribute.AttributeValuesType.Unknown; + } + + associated_detector.getSubcomponent_state().getMonitoring_attributes().put(metric_name,new RealtimeMonitoringAttribute(metric_name,lower_bound,upper_bound,attribute_type)); + } + }catch (Exception e){ + e.printStackTrace(); + } + + //slo_rule_arrived.set(true); + can_modify_monitoring_metrics.notifyAll(); + + Logger.getGlobal().log(info_logging_level, "BrokerClientApp: - Received text message: " + message + " at topic " + metric_list_topic); + + } + return "Monitoring metrics message processed"; + }; + + public static BiFunction slo_rule_topic_subscriber_function = (broker_details, message) -> { + //DetectorSubcomponent new_detector = new DetectorSubcomponent(application, CharacterizedThread.CharacterizedThreadRunMode.detached); + String application = broker_details.getApplication_name(); + DetectorSubcomponent new_detector = get_associated_detector(application); + detector_subcomponents.put(application,new_detector); + synchronized (new_detector.can_modify_slo_rules) { + new_detector.can_modify_slo_rules.setValue(true); + MESSAGE_CONTENTS.assign_value(application,slo_rules_topic, message); + new_detector.slo_rule_arrived.set(true); + new_detector.can_modify_slo_rules.notifyAll(); + + Logger.getGlobal().log(info_logging_level, "BrokerClientApp: - Received text message: " + message + " at topic " + slo_rules_topic); + + } + return slo_rules_topic + ":MSG:" + message; + }; + @Override public String get_name() { return director_name; diff --git a/slo-violation-detector/src/main/java/slo_violation_detector_engine/generic/ComponentState.java b/slo-violation-detector/src/main/java/slo_violation_detector_engine/generic/ComponentState.java new file mode 100644 index 0000000..6a3d576 --- /dev/null +++ b/slo-violation-detector/src/main/java/slo_violation_detector_engine/generic/ComponentState.java @@ -0,0 +1,11 @@ +package slo_violation_detector_engine.generic; + +import java.util.Properties; + +public class ComponentState { + + public static Properties prop = new Properties(); + public static String broker_ip ="localhost"; + public static String broker_username= "admin"; + public static String broker_password= "admin"; +} diff --git a/slo-violation-detector/src/main/java/slo_violation_detector_engine/generic/Runnables.java b/slo-violation-detector/src/main/java/slo_violation_detector_engine/generic/Runnables.java index 04791c9..8437039 100644 --- a/slo-violation-detector/src/main/java/slo_violation_detector_engine/generic/Runnables.java +++ b/slo-violation-detector/src/main/java/slo_violation_detector_engine/generic/Runnables.java @@ -7,10 +7,12 @@ import utility_beans.BrokerPublisher; import org.json.simple.JSONObject; import slo_rule_modelling.SLORule; import utility_beans.BrokerSubscriber; +import utility_beans.BrokerSubscriptionDetails; import utility_beans.CharacterizedThread; import java.sql.Timestamp; import java.time.Clock; +import java.util.Collections; import java.util.Date; import java.util.NoSuchElementException; import java.util.logging.Logger; @@ -18,24 +20,27 @@ import java.util.logging.Logger; import static configuration.Constants.*; import static java.lang.Thread.sleep; import static slo_rule_modelling.SLORule.process_rule_value; -import static slo_violation_detector_engine.generic.SLOViolationDetectorStateUtils.*; +import static slo_violation_detector_engine.generic.ComponentState.*; import static slo_violation_detector_engine.detector.DetectorSubcomponentUtilities.*; import static utilities.DebugDataSubscription.*; public class Runnables { public static class DebugDataRunnable implements Runnable{ - DetectorSubcomponent detector; + private DetectorSubcomponent detector; //TODO Verify whether or not we need this message per detector or on a detector-independent, all-application way public DebugDataRunnable(DetectorSubcomponent detector){ this.detector = detector; } @Override public void run() { try { + Logger.getGlobal().log(info_logging_level,"Starting to subscribe to debug output trigger"); synchronized (detector.HAS_MESSAGE_ARRIVED.get_synchronized_boolean(debug_data_trigger_topic_name)) { //if (Main.HAS_MESSAGE_ARRIVED.get_synchronized_boolean(debug_data_topic_name).getValue()) - debug_data_subscriber = new BrokerSubscriber(debug_data_trigger_topic_name, detector.getBrokerSubscriptionDetails().getBroker_ip(),detector.getBrokerSubscriptionDetails().getBroker_username(),detector.getBrokerSubscriptionDetails().getBroker_password(), amq_library_configuration_location); + BrokerSubscriptionDetails broker_details = detector.getBrokerSubscriptionDetails(debug_data_trigger_topic_name); + debug_data_subscriber = new BrokerSubscriber(debug_data_trigger_topic_name, broker_details.getBroker_ip(),broker_details.getBroker_username(),broker_details.getBroker_password(), amq_library_configuration_location,detector.get_application_name()); debug_data_subscriber.subscribe(debug_data_generation, detector.stop_signal); + Logger.getGlobal().log(info_logging_level,"Debug data subscriber initiated"); } if (Thread.interrupted()) { throw new InterruptedException(); @@ -67,7 +72,7 @@ public class Runnables { public static Runnable get_severity_calculation_runnable(SLORule rule, DetectorSubcomponent detector) { Runnable severity_calculation_runnable = () -> { - BrokerPublisher persistent_publisher = new BrokerPublisher(topic_for_severity_announcement, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password"), amq_library_configuration_location); + BrokerPublisher persistent_publisher = new BrokerPublisher(topic_for_severity_announcement, broker_ip,broker_username,broker_password, amq_library_configuration_location); while (!detector.stop_signal.get()) { synchronized (detector.PREDICTION_EXISTS) { @@ -137,7 +142,7 @@ public class Runnables { severity_json.put("severity", rule_severity); severity_json.put("probability", slo_violation_probability); severity_json.put("predictionTime", targeted_prediction_time); - persistent_publisher.publish(severity_json.toJSONString()); + persistent_publisher.publish(severity_json.toJSONString(), Collections.singleton(detector.get_application_name())); } detector.getSubcomponent_state().slo_violation_event_recording_queue.add(System.currentTimeMillis()); diff --git a/slo-violation-detector/src/main/java/slo_violation_detector_engine/generic/SLOViolationDetectorStateUtils.java b/slo-violation-detector/src/main/java/slo_violation_detector_engine/generic/SLOViolationDetectorStateUtils.java index 92a4248..ee21c31 100644 --- a/slo-violation-detector/src/main/java/slo_violation_detector_engine/generic/SLOViolationDetectorStateUtils.java +++ b/slo-violation-detector/src/main/java/slo_violation_detector_engine/generic/SLOViolationDetectorStateUtils.java @@ -16,9 +16,7 @@ public class SLOViolationDetectorStateUtils { private static String self_starting_command_string = "java -jar SLOSeverityCalculator-4.0-SNAPSHOT.jar > $LOG_FILE 2>&1"; public static OperationalMode operational_mode; public final SynchronizedInteger create_new_slo_detector = new SynchronizedInteger(0); - public static SynchronizedStringMap MESSAGE_CONTENTS = new SynchronizedStringMap(); - public static Properties prop = new Properties(); public static InputStream getPreferencesFileInputStream(String custom_properties_file_path) throws IOException { diff --git a/slo-violation-detector/src/main/java/utilities/DebugDataSubscription.java b/slo-violation-detector/src/main/java/utilities/DebugDataSubscription.java index 1e5c74d..6570fba 100644 --- a/slo-violation-detector/src/main/java/utilities/DebugDataSubscription.java +++ b/slo-violation-detector/src/main/java/utilities/DebugDataSubscription.java @@ -16,7 +16,6 @@ import java.util.logging.Logger; import static configuration.Constants.*; import static slo_violation_detector_engine.detector.DetectorSubcomponent.detector_subcomponents; import static slo_violation_detector_engine.director.DirectorSubcomponent.director_subcomponents; -import static utility_beans.RealtimeMonitoringAttribute.get_metric_value; /** * The objective of this class is to allow a structured synopsis of the current state of the SLO Violation Detector to be created, as a response to a request sent to it through an appropriate topic. @@ -25,7 +24,6 @@ public class DebugDataSubscription { public static String debug_data_trigger_topic_name = "sloviolationdetector.debug"; public static String debug_data_output_topic_name = "sloviolationdetector.debug_output"; - private static String broker_username,broker_password,broker_ip_address; public static BiFunction debug_data_generation = (broker_subscription_details, message) ->{ String output_debug_data = ""; @@ -93,8 +91,8 @@ public class DebugDataSubscription { intermediate_debug_string.append("\t- Metric name: ").append(entry.getKey()); } - Double metric_value = get_metric_value(detector,entry.getKey()); - CircularFifoQueue metric_values = entry.getValue().getActual_metric_values(); + Double metric_value = detector.get_metric_value(entry.getKey()); + CircularFifoQueue metric_values = entry.getValue().getActual_metric_values(); if (metric_value.isNaN()) { intermediate_debug_string.append(" - value was determined as NaN, individual collected values are ").append(metric_values).append("\n"); } else if (metric_value.isInfinite()) { diff --git a/slo-violation-detector/src/main/java/utilities/MathUtils.java b/slo-violation-detector/src/main/java/utilities/MathUtils.java index d389d1f..45cbe2e 100644 --- a/slo-violation-detector/src/main/java/utilities/MathUtils.java +++ b/slo-violation-detector/src/main/java/utilities/MathUtils.java @@ -12,11 +12,16 @@ package utilities; import java.util.List; public class MathUtils { - public static double get_average(Iterable values){ + + public static double get_average(Iterable values){ double sum = 0; int counter = 0; - for (Double value : values){ - sum = sum+value; + for (Number value :values){ + if (value instanceof Double){ + sum = sum+(Double)value; + } else if (value instanceof Integer) { + sum = sum+(Integer)value; + } counter++; } return (sum/counter); diff --git a/slo-violation-detector/src/main/java/utilities/MonitoringAttributeUtilities.java b/slo-violation-detector/src/main/java/utilities/MonitoringAttributeUtilities.java index c634cb7..ed13987 100644 --- a/slo-violation-detector/src/main/java/utilities/MonitoringAttributeUtilities.java +++ b/slo-violation-detector/src/main/java/utilities/MonitoringAttributeUtilities.java @@ -48,7 +48,7 @@ public class MonitoringAttributeUtilities { detector_state.getMonitoring_attributes().remove(monitoring_metric_name); } - detector_state.getMonitoring_attributes().put(monitoring_metric_name, new RealtimeMonitoringAttribute(monitoring_metric_name)); + detector_state.getMonitoring_attributes().put(monitoring_metric_name, new RealtimeMonitoringAttribute(monitoring_metric_name,false, RealtimeMonitoringAttribute.AttributeValuesType.Unknown)); detector_state.getMonitoring_attributes_roc_statistics().put(monitoring_metric_name,new MonitoringAttributeStatistics()); //The rate of change of a metric, is a metric which itself should be monitored for its upper bound diff --git a/slo-violation-detector/src/main/java/utility_beans/BrokerPublisher.java b/slo-violation-detector/src/main/java/utility_beans/BrokerPublisher.java index 3f21222..fc8d027 100644 --- a/slo-violation-detector/src/main/java/utility_beans/BrokerPublisher.java +++ b/slo-violation-detector/src/main/java/utility_beans/BrokerPublisher.java @@ -8,6 +8,7 @@ import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; import org.json.simple.parser.ParseException; +import java.lang.reflect.Array; import java.util.*; import java.util.logging.Level; import java.util.logging.Logger; @@ -15,6 +16,7 @@ import java.util.logging.Logger; import static configuration.Constants.*; import static java.lang.Thread.sleep; import static java.util.logging.Level.INFO; +import static slo_violation_detector_engine.detector.DetectorSubcomponent.detector_subcomponents; import static utilities.DebugDataSubscription.debug_data_output_topic_name; public class BrokerPublisher { @@ -89,19 +91,26 @@ public class BrokerPublisher { } } - //TODO This assumes that the only content to be sent is json-like - public void publish(String json_string_content) { - JSONParser parser = new JSONParser(); - JSONObject json_object = new JSONObject(); - try{ - json_object = (JSONObject) parser.parse(json_string_content); - }catch (ParseException p){ - Logger.getGlobal().log(Level.WARNING,"Could not parse the string content to be published to the broker as json"); - } - if (private_publisher_instance!=null) { - private_publisher_instance.send(json_object); - }else{ - Logger.getGlobal().log(Level.SEVERE,"Could not send message to AMQP broker, as the broker ip to be used has not been specified"); + //TODO The methods below assume that the only content to be sent is json-like + public void publish (String json_string_content, Iterable application_names){ + + for (String application_name : application_names) { + JSONParser parser = new JSONParser(); + JSONObject json_object = new JSONObject(); + try { + json_object = (JSONObject) parser.parse(json_string_content); + } catch (ParseException p) { + Logger.getGlobal().log(Level.WARNING, "Could not parse the string content to be published to the broker as json"); + } + if (private_publisher_instance != null) { + private_publisher_instance.send(json_object); + } else { + Logger.getGlobal().log(Level.SEVERE, "Could not send message to AMQP broker, as the broker ip to be used has not been specified"); + } } } + + public void publish(String json_string_content) { + publish(json_string_content,detector_subcomponents.keySet()); + } } diff --git a/slo-violation-detector/src/main/java/utility_beans/BrokerSubscriber.java b/slo-violation-detector/src/main/java/utility_beans/BrokerSubscriber.java index fe1168d..fafdf04 100644 --- a/slo-violation-detector/src/main/java/utility_beans/BrokerSubscriber.java +++ b/slo-violation-detector/src/main/java/utility_beans/BrokerSubscriber.java @@ -18,21 +18,25 @@ import static java.util.logging.Level.INFO; public class BrokerSubscriber { - private static class MessageProcessingHandler extends Handler{ + private class MessageProcessingHandler extends Handler{ + private BrokerSubscriptionDetails broker_details; private static final BiFunction temporary_function = (Object o, Object o2) -> { //System.out.println(""); Logger.getGlobal().log(INFO,"REPLACE_TEMPORARY_HANDLING_FUNCTIONALITY"); return "IN_PROCESSING"; }; - private BiFunction processing_function; + private BiFunction processing_function; @Override public void onMessage(String key, String address, Map body, Message message, Context context) { - processing_function.apply(address, JSONValue.toJSONString(body)); + Logger.getGlobal().log(info_logging_level,"Handling message for address "+address); + processing_function.apply(broker_details, JSONValue.toJSONString(body)); } - public MessageProcessingHandler(){ + public MessageProcessingHandler(BrokerSubscriptionDetails broker_details){ + this.broker_details = broker_details; this.processing_function = temporary_function; } - public MessageProcessingHandler(BiFunction biFunction){ + public MessageProcessingHandler(BiFunction biFunction, BrokerSubscriptionDetails broker_details){ + this.broker_details = broker_details; this.processing_function = biFunction; } public BiFunction getProcessing_function() { @@ -49,7 +53,8 @@ public class BrokerSubscriber { private String broker_ip; private String brokerUsername; private String brokerPassword; - public BrokerSubscriber(String topic, String broker_ip, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation){ + BrokerSubscriptionDetails broker_details; + public BrokerSubscriber(String topic, String broker_ip, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation,String application_name){ boolean able_to_initialize_BrokerSubscriber = topic!=null && broker_ip!=null && brokerUsername!=null && brokerPassword!=null && !topic.equals(EMPTY) && !broker_ip.equals(EMPTY) && !brokerUsername.equals(EMPTY) && !brokerPassword.equals(EMPTY); if (!able_to_initialize_BrokerSubscriber){ @@ -62,6 +67,7 @@ public class BrokerSubscriber { throw new RuntimeException(e); } } + broker_details = new BrokerSubscriptionDetails(broker_ip,brokerUsername,brokerPassword,application_name,topic); boolean subscriber_configuration_changed; if (!broker_and_topics_to_subscribe_to.containsKey(broker_ip)){ HashSet topics_to_subscribe_to = new HashSet<>(); @@ -84,7 +90,7 @@ public class BrokerSubscriber { } } if (subscriber_configuration_changed){ - Consumer current_consumer = new Consumer(topic, topic, new MessageProcessingHandler(),true,true); + Consumer current_consumer = new Consumer(topic, topic, new MessageProcessingHandler(broker_details),true,true); active_consumers_per_topic_per_broker_ip.get(broker_ip).put(topic,current_consumer); this.topic = topic; @@ -142,7 +148,7 @@ public class BrokerSubscriber { active_consumers_per_topic_per_broker_ip.put(broker_ip,new HashMap<>()); } //Then add the new consumer - Consumer new_consumer = new Consumer(topic,topic,new MessageProcessingHandler(function),true,true); + Consumer new_consumer = new Consumer(topic,topic,new MessageProcessingHandler(function,broker_details),true,true); new_consumer.setProperty("topic",topic); active_consumers_per_topic_per_broker_ip.get(broker_ip).put(topic,new_consumer); add_topic_consumer_to_broker_connector(new_consumer); @@ -175,11 +181,11 @@ public class BrokerSubscriber { public static class TopicNames{ public static String realtime_metric_values_topic(String metric) { - return realtime_metrics_topic+metric; + return topic_prefix_realtime_metrics +metric; } public static String final_metric_predictions_topic(String metric) { - return final_metric_prediction_topic+metric; + return topic_prefix_final_predicted_metrics +metric; } } } diff --git a/slo-violation-detector/src/main/java/utility_beans/BrokerSubscriptionDetails.java b/slo-violation-detector/src/main/java/utility_beans/BrokerSubscriptionDetails.java index e367469..1a594c6 100644 --- a/slo-violation-detector/src/main/java/utility_beans/BrokerSubscriptionDetails.java +++ b/slo-violation-detector/src/main/java/utility_beans/BrokerSubscriptionDetails.java @@ -1,16 +1,21 @@ package utility_beans; import static configuration.Constants.EMPTY; +import static configuration.Constants.default_application_name; public class BrokerSubscriptionDetails { String broker_username = "admin"; String broker_password = "admin"; String broker_ip = "localhost"; + String application_name = default_application_name; + String topic = EMPTY; - public BrokerSubscriptionDetails(String broker_ip, String broker_username, String broker_password) { + public BrokerSubscriptionDetails(String broker_ip, String broker_username, String broker_password,String application_name, String topic) { this.broker_ip = broker_ip; this.broker_username = broker_username; this.broker_password = broker_password; + this.topic = topic; + this.application_name = application_name; } public BrokerSubscriptionDetails(boolean fake_broker_subscription) { @@ -18,6 +23,8 @@ public class BrokerSubscriptionDetails { this.broker_username = EMPTY; this.broker_password = EMPTY; this.broker_ip = EMPTY; + this.topic = EMPTY; + this.application_name = EMPTY; } } @@ -44,4 +51,20 @@ public class BrokerSubscriptionDetails { public void setBroker_ip(String broker_ip) { this.broker_ip = broker_ip; } + + public String getApplication_name() { + return application_name; + } + + public void setApplication_name(String application_name) { + this.application_name = application_name; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } } diff --git a/slo-violation-detector/src/main/java/utility_beans/CustomDataPublisher.java b/slo-violation-detector/src/main/java/utility_beans/CustomDataPublisher.java index 0d99073..e38f017 100644 --- a/slo-violation-detector/src/main/java/utility_beans/CustomDataPublisher.java +++ b/slo-violation-detector/src/main/java/utility_beans/CustomDataPublisher.java @@ -15,13 +15,179 @@ import java.util.logging.Level; import java.util.logging.Logger; import static configuration.Constants.*; +import static utilities.DebugDataSubscription.debug_data_trigger_topic_name; + + +import java.util.HashMap; +import java.util.Map; public class CustomDataPublisher { + private static final Map presetTexts = new HashMap<>(); + + static { + update_event_data(); + } + + private static void update_event_data(){ + presetTexts.put("eu.nebulouscloud.monitoring.slo.new", "{\n" + + " \"name\": \"_Application1\",\n" + + " \"operator\": \"OR\",\n" + + " \"constraints\": [\n" + + " {\n" + + " \"name\": \"cpu_and_memory_or_swap_too_high\",\n" + + " \"operator\": \"AND\",\n" + + " \"constraints\": [\n" + + " {\n" + + " \"name\": \"cpu_usage_high\",\n" + + " \"metric\": \"cpu_usage\",\n" + + " \"operator\": \">\",\n" + + " \"threshold\": 80.0\n" + + " },\n" + + " {\n" + + " \"name\": \"memory_or_swap_usage_high\",\n" + + " \"operator\": \"OR\",\n" + + " \"constraints\": [\n" + + " {\n" + + " \"name\": \"memory_usage_high\",\n" + + " \"metric\": \"ram_usage\",\n" + + " \"operator\": \">\",\n" + + " \"threshold\": 70.0\n" + + " },\n" + + " {\n" + + " \"name\": \"disk_usage_high\",\n" + + " \"metric\": \"swap_usage\",\n" + + " \"operator\": \">\",\n" + + " \"threshold\": 50.0\n" + + " }\n" + + " ]\n" + + " }\n" + + " ]\n" + + " }\n" + + " ]\n" + + "}"); + presetTexts.put("eu.nebulouscloud.monitoring.realtime.cpu_usage", + "{\n" + + " \"metricValue\": 12.34,\n" + + " \"level\": 1,\n" + + " \"component_id\":\"postgresql_1\",\n" + + " \"timestamp\": "+(int)(System.currentTimeMillis()/1000)+"\n" + + "}\n"); + presetTexts.put("eu.nebulouscloud.monitoring.predicted.cpu_usage", "{\n" + + " \"metricValue\": 92.34,\n" + + " \"level\": 1,\n" + + " \"timestamp\": "+(int)(System.currentTimeMillis()/1000)+"\n" + + " \"probability\": 0.98,\n" + + " \"confidence_interval\" : [8,15]\n" + + " \"predictionTime\": "+(int)(10+System.currentTimeMillis()/1000)+"\n" + + "}"); + } + private Publisher private_publisher_instance; + private String topic; + private String broker_ip; + + public CustomDataPublisher(String broker_topic, String broker_ip, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation,String publisher_key) { + + boolean publisher_configuration_changed; + ArrayList publishers = new ArrayList<>(); + private_publisher_instance = new Publisher(slovid_publisher_key,broker_topic,true,true); + publishers.add(private_publisher_instance); + + + Connector connector = new Connector("slovid", + new ConnectorHandler() { + }, publishers + , List.of(), + false, + false, + new StaticExnConfig( + broker_ip, + 5672, + brokerUsername, + brokerPassword, + 60, + EMPTY + ) + ); + connector.start(); + + } + + + public CustomDataPublisher(String broker_topic, String broker_ip, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation) { + this(broker_topic,broker_ip,brokerUsername,brokerPassword,amqLibraryConfigurationLocation,slovid_publisher_key); + } + + public static void main(String[] args) { + JFrame frame = new JFrame("Broker input app"); + JTextField broker_ipTextField = new JTextField("localhost", 10); + JComboBox smallTextField = new JComboBox<>(new String[]{"eu.nebulouscloud.monitoring.slo.new","eu.nebulouscloud.monitoring.realtime.cpu_usage", "eu.nebulouscloud.monitoring.predicted.cpu_usage", debug_data_trigger_topic_name}); + smallTextField.setEditable(true); + JTextField othersmallTextField = new JTextField("slovid", 10); + JTextArea largeTextArea = new JTextArea(10, 25); + JButton submitButton = new JButton("Send"); + + AtomicReference broker_ip = new AtomicReference<>(); + AtomicReference broker_topic = new AtomicReference<>(); + AtomicReference message_payload = new AtomicReference<>(); + AtomicReference publisher_key = new AtomicReference<>(); + + smallTextField.addActionListener(e -> { + update_event_data(); + String selectedOption = (String) smallTextField.getSelectedItem(); + String presetText = presetTexts.getOrDefault(selectedOption, ""); + largeTextArea.setText(presetText); + }); + + submitButton.addActionListener(e -> { + broker_ip.set(broker_ipTextField.getText()); + broker_topic.set(smallTextField.getSelectedItem().toString()); + message_payload.set(largeTextArea.getText()); + publisher_key.set(othersmallTextField.getText()); + CustomDataPublisher publisher = new CustomDataPublisher(broker_topic.toString(), broker_ip.toString(), "admin", "admin", EMPTY, publisher_key.toString()); + publisher.publish(message_payload.toString()); + }); + + JPanel panel = new JPanel(); + panel.add(new JLabel("Broker to publish to:")); + panel.add(broker_ipTextField); + panel.add(new JLabel("Topic to publish to:")); + panel.add(smallTextField); + panel.add(new JLabel("Key to publish with:")); + panel.add(othersmallTextField); + panel.add(new JLabel("Text to publish:")); + panel.add(new JScrollPane(largeTextArea)); + panel.add(submitButton); + + frame.add(panel); + frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE); + frame.pack(); + frame.setVisible(true); + } + + //TODO This assumes that the only content to be sent is json-like + public void publish(String json_string_content) { + JSONParser parser = new JSONParser(); + JSONObject json_object = new JSONObject(); + try{ + json_object = (JSONObject) parser.parse(json_string_content); + }catch (ParseException p){ + Logger.getGlobal().log(Level.SEVERE,"Could not parse the string content"); + } + private_publisher_instance.send(json_object); + } + + public void publish(JSONObject json_object) { + private_publisher_instance.send(json_object); + } +} + + +class OldCustomDataPublisher { private static HashMap> broker_and_topics_to_publish_to = new HashMap<>(); private Publisher private_publisher_instance; private String topic; private String broker_ip; - public CustomDataPublisher(String broker_topic, String broker_ip, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation,String publisher_key) { + public OldCustomDataPublisher(String broker_topic, String broker_ip, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation,String publisher_key) { boolean publisher_configuration_changed; ArrayList publishers = new ArrayList<>(); @@ -49,7 +215,7 @@ public class CustomDataPublisher { } - public CustomDataPublisher(String broker_topic, String broker_ip, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation) { + public OldCustomDataPublisher(String broker_topic, String broker_ip, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation) { this(broker_topic,broker_ip,brokerUsername,brokerPassword,amqLibraryConfigurationLocation,slovid_publisher_key); } public static void main(String[] args){ @@ -58,25 +224,30 @@ public class CustomDataPublisher { //msg.put("key","value"); JFrame frame = new JFrame("Broker input app"); + JTextField broker_ipTextField = new JTextField("localhost",10); JTextField smallTextField = new JTextField("eu.nebulouscloud.monitoring.metric_list",30); JTextField othersmallTextField = new JTextField("slovid",20); JTextArea largeTextArea = new JTextArea(10, 30); JButton submitButton = new JButton("Send"); + AtomicReference broker_ip = new AtomicReference<>(); AtomicReference broker_topic = new AtomicReference<>(); AtomicReference message_payload = new AtomicReference<>(); AtomicReference publisher_key = new AtomicReference<>(); submitButton.addActionListener(e -> { + broker_ip.set(broker_ipTextField.getText()); broker_topic.set(smallTextField.getText()); message_payload.set(largeTextArea.getText()); publisher_key.set(othersmallTextField.getText()); - CustomDataPublisher publisher = new CustomDataPublisher(broker_topic.toString(),"localhost","admin","admin",EMPTY,publisher_key.toString()); + OldCustomDataPublisher publisher = new OldCustomDataPublisher(broker_topic.toString(),broker_ip.toString(),"admin","admin",EMPTY,publisher_key.toString()); publisher.publish(message_payload.toString()); }); JPanel panel = new JPanel(); + panel.add(new JLabel("Broker to publish to:")); + panel.add(broker_ipTextField); panel.add(new JLabel("Topic to publish to:")); panel.add(smallTextField); panel.add(new JLabel("Key to publish with:")); diff --git a/slo-violation-detector/src/main/java/utility_beans/MonitoringAttributeStatistics.java b/slo-violation-detector/src/main/java/utility_beans/MonitoringAttributeStatistics.java index 112205b..f6ac446 100644 --- a/slo-violation-detector/src/main/java/utility_beans/MonitoringAttributeStatistics.java +++ b/slo-violation-detector/src/main/java/utility_beans/MonitoringAttributeStatistics.java @@ -45,13 +45,13 @@ public class MonitoringAttributeStatistics { this.lower_bound = lower_bound; } - public void update_attribute_statistics(double new_attribute_value){ + public void update_attribute_statistics(Number new_attribute_value){ count++; - double mean_differential = (new_attribute_value - current_mean) / count; + double mean_differential = (new_attribute_value.doubleValue() - current_mean) / count; double new_mean = current_mean + mean_differential; - double dsquared_increment = (new_attribute_value - new_mean) * (new_attribute_value - current_mean); + double dsquared_increment = (new_attribute_value.doubleValue() - new_mean) * (new_attribute_value.doubleValue() - current_mean); double new_dsquared = current_dsquared + dsquared_increment; current_mean = new_mean; @@ -59,7 +59,7 @@ public class MonitoringAttributeStatistics { if (!hard_upper_bound_is_set){ if (count==1) { - upper_bound = new_attribute_value; + upper_bound = new_attribute_value.doubleValue(); }else { double candidate_upper_value = new_mean + Math.sqrt(10.0) * Math.sqrt(new_dsquared / (count - 1)); //Chebyshev-based 90th percentile value @@ -70,7 +70,7 @@ public class MonitoringAttributeStatistics { } if (!hard_lower_bound_is_set) { if (count==1){ - lower_bound = new_attribute_value; + lower_bound = new_attribute_value.doubleValue(); }else { double candidate_lower_value = new_mean - Math.sqrt(10.0) * Math.sqrt(new_dsquared / (count - 1)); //Chebyshev-based 90th percentile value //if (candidate_lower_value < lower_bound) { diff --git a/slo-violation-detector/src/main/java/utility_beans/PredictedMonitoringAttribute.java b/slo-violation-detector/src/main/java/utility_beans/PredictedMonitoringAttribute.java index 4a98809..a0ef586 100644 --- a/slo-violation-detector/src/main/java/utility_beans/PredictedMonitoringAttribute.java +++ b/slo-violation-detector/src/main/java/utility_beans/PredictedMonitoringAttribute.java @@ -49,7 +49,7 @@ public class PredictedMonitoringAttribute { this.initialized = true; this.name = name; this.threshold = threshold; - double current_value = RealtimeMonitoringAttribute.get_metric_value(detector,name); + double current_value = detector.get_metric_value(name); if (Double.isNaN(current_value)){ Logger.getGlobal().log(info_logging_level,"Detected NaN value for metric "+name+". Thus we cannot compute severity although a predicted value of "+forecasted_value+" has arrived"); this.initialized = false; diff --git a/slo-violation-detector/src/main/java/utility_beans/RealtimeMonitoringAttribute.java b/slo-violation-detector/src/main/java/utility_beans/RealtimeMonitoringAttribute.java index 07ea64a..a77830c 100644 --- a/slo-violation-detector/src/main/java/utility_beans/RealtimeMonitoringAttribute.java +++ b/slo-violation-detector/src/main/java/utility_beans/RealtimeMonitoringAttribute.java @@ -14,84 +14,83 @@ import utilities.MathUtils; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.logging.Logger; import static configuration.Constants.*; +import static java.lang.Integer.valueOf; import static utility_beans.PredictedMonitoringAttribute.*; public class RealtimeMonitoringAttribute { protected String name; - private Double upper_bound; - private Double lower_bound; - private CircularFifoQueue actual_metric_values = new CircularFifoQueue<>(kept_values_per_metric); //the previous actual values of the metric + public enum AttributeValuesType{Integer, Unknown, Double} + private AttributeValuesType type; + private Number upper_bound; + private Number lower_bound; + private CircularFifoQueue actual_metric_values = new CircularFifoQueue<>(kept_values_per_metric); //the previous actual values of the metric - public RealtimeMonitoringAttribute(String name, Double lower_bound, Double upper_bound){ + public RealtimeMonitoringAttribute(String name, Number lower_bound, Number upper_bound,AttributeValuesType type){ this.name = name; this.lower_bound = lower_bound; this.upper_bound = upper_bound; + this.type = type; } - public RealtimeMonitoringAttribute(String name, Collection values){ - this.name = name; + public RealtimeMonitoringAttribute(String name, Collection values,AttributeValuesType type){ this.lower_bound = 0.0; - this.upper_bound = 100.0; + this.upper_bound= 100.0; + this.name = name; + this.type = type; //Equivalent to below: values.stream().forEach(x -> actual_metric_values.add(x)); actual_metric_values.addAll(values); } - public RealtimeMonitoringAttribute(String name, Double value){ + public RealtimeMonitoringAttribute(String name, Number value){ this.name = name; - this.lower_bound = 0.0; - this.upper_bound = 100.0; + this.lower_bound = 0; + this.upper_bound= 100; + if (value instanceof Integer){ + this.type = AttributeValuesType.Integer; + }else if (value instanceof Double){ + this.type = AttributeValuesType.Double; + } actual_metric_values.add(value); } - public RealtimeMonitoringAttribute(String name){ + public RealtimeMonitoringAttribute(String name,Boolean has_infinite_bounds,AttributeValuesType type){ this.name = name; - this.lower_bound = 0.0; - this.upper_bound = 100.0; - } - - public static Double get_metric_value(DetectorSubcomponent detector, String metric_name){ - CircularFifoQueue actual_metric_values = detector.getSubcomponent_state().getMonitoring_attributes().get(metric_name).getActual_metric_values(); - if (actual_metric_values.size()==0){ - Logger.getGlobal().log(warning_logging_level,"Trying to retrieve realtime values from an empty queue for metric "+metric_name); + this.type = type; + if (has_infinite_bounds){ + if (type==AttributeValuesType.Double) { + this.upper_bound = Double.POSITIVE_INFINITY; + this.lower_bound = Double.NEGATIVE_INFINITY; + } + else if (type==AttributeValuesType.Integer){ + this.upper_bound = Integer.MAX_VALUE; + this.lower_bound = Integer.MIN_VALUE; + } + }else { + this.lower_bound = 0.0; + this.upper_bound = 100.0; } - return aggregate_metric_values(actual_metric_values); } - private static Double aggregate_metric_values(Iterable metric_values) { + public static Double aggregate_metric_values(Iterable metric_values) { return MathUtils.get_average(metric_values); } - public static void update_monitoring_attribute_value(DetectorSubcomponent detector, String name,Double value){ - if(detector.getSubcomponent_state().getMonitoring_attributes().get(name)==null){ - detector.getSubcomponent_state().getMonitoring_attributes().put(name,new RealtimeMonitoringAttribute(name)); - //monitoring_attributes_max_values.put(name,value); - //monitoring_attributes_min_values.put(name,value); - } - detector.getSubcomponent_state().getMonitoring_attributes().get(name).getActual_metric_values().add(value); - detector.getSubcomponent_state().getMonitoring_attributes_statistics().get(name).update_attribute_statistics(value); - /* - if(get_90th_percentile_high_value(name,value)>monitoring_attributes_max_values.get(name)){ - monitoring_attributes_max_values.put(name,value); - }else if (get_90th_percentile_low_value(name,value)> void initialize_monitoring_attribute_rates_of_change(DetectorSubcomponent detector, T metric_names){ initialize_monitoring_attribute_hashmap(detector.getSubcomponent_state().getMonitoring_attributes(),metric_names); - initialize_attribute_value_hashmap(getAttributes_maximum_rate_of_change(),metric_names); - initialize_attribute_value_hashmap(getAttributes_minimum_rate_of_change(),metric_names); + initialize_attribute_double_value_hashmap(getAttributes_maximum_rate_of_change(),metric_names); + initialize_attribute_double_value_hashmap(getAttributes_minimum_rate_of_change(),metric_names); } - public static > void initialize_monitoring_attribute_hashmap(HashMap map, T metric_names){ + public static > void initialize_monitoring_attribute_hashmap(HashMap map, T metric_names){ for (String metric_name : metric_names){ - map.put(metric_name,new RealtimeMonitoringAttribute(metric_name)); + map.put(metric_name,new RealtimeMonitoringAttribute(metric_name,false,AttributeValuesType.Unknown)); } } @@ -101,14 +100,14 @@ public class RealtimeMonitoringAttribute { } } - public static > void initialize_monitoring_attributes (DetectorSubcomponent detector, T metric_names_bounds){ + public static void initialize_monitoring_attributes (DetectorSubcomponent detector, HashMap metric_names_bounds){ for (String metric_name : metric_names_bounds.keySet()) { detector.getSubcomponent_state().getMonitoring_attributes_statistics().put(metric_name, new - MonitoringAttributeStatistics(metric_names_bounds.get(metric_name).lower_bound,metric_names_bounds.get(metric_name).upper_bound)); + MonitoringAttributeStatistics(metric_names_bounds.get(metric_name).lower_bound.doubleValue(),metric_names_bounds.get(metric_name).upper_bound.doubleValue())); } } - private static > void initialize_attribute_value_hashmap(HashMap hashmap ,T metric_names){ + private static > void initialize_attribute_double_value_hashmap(HashMap hashmap , T metric_names){ for (String metric_name: metric_names){ hashmap.put(metric_name,0.0); } @@ -122,27 +121,34 @@ public class RealtimeMonitoringAttribute { return name; } - public CircularFifoQueue getActual_metric_values() { + public CircularFifoQueue getActual_metric_values() { return actual_metric_values; } - public void setActual_metric_values(CircularFifoQueue actual_metric_values) { + public void setActual_metric_values(CircularFifoQueue actual_metric_values) { this.actual_metric_values = actual_metric_values; } public Double getUpper_bound() { - return upper_bound; + return upper_bound.doubleValue(); } - public void setUpper_bound(Double upper_bound) { + public void setUpper_bound(Number upper_bound) { this.upper_bound = upper_bound; } public Double getLower_bound() { - return lower_bound; + return lower_bound.doubleValue(); } - public void setLower_bound(Double lower_bound) { + public void setLower_bound(Number lower_bound) { this.lower_bound = lower_bound; } + + public void setType(AttributeValuesType type){ + this.type = type; + } + public AttributeValuesType getType(){ + return type; + } } diff --git a/slo-violation-detector/src/main/java/utility_beans/SynchronizedStringMap.java b/slo-violation-detector/src/main/java/utility_beans/SynchronizedStringMap.java index 3dd6c1c..a8108d5 100644 --- a/slo-violation-detector/src/main/java/utility_beans/SynchronizedStringMap.java +++ b/slo-violation-detector/src/main/java/utility_beans/SynchronizedStringMap.java @@ -17,17 +17,28 @@ import static configuration.Constants.EMPTY; public class SynchronizedStringMap { - private Map synchronized_map = Collections.synchronizedMap(new HashMap<>()); // using Collections.synchronized map as we intend to add/remove topics to the map dynamically - public String get_synchronized_contents(String name){ - if (synchronized_map.containsKey(name)) { - return synchronized_map.get(name); + private Map> synchronized_map = Collections.synchronizedMap(new HashMap<>()); // using Collections.synchronized map as we intend to add/remove topics to the map dynamically + public String get_synchronized_contents(String application_name, String topic_name){ + if (synchronized_map.containsKey(application_name)) { + if(synchronized_map.get(application_name).containsKey(topic_name)){ + return synchronized_map.get(application_name).get(topic_name); + }else{ + synchronized_map.get(application_name).put(topic_name,EMPTY); + return EMPTY; + } }else{ - synchronized_map.put(name,new String(EMPTY)); - return synchronized_map.get(name); + HashMap new_map = new HashMap<>(); + synchronized_map.put(application_name,Collections.synchronizedMap(new HashMap<>())); + synchronized_map.get(application_name).put(topic_name,EMPTY); + return EMPTY; } } - public String assign_value(String topic, String value){ - synchronized_map.put(topic,value); - return synchronized_map.get(topic); + public void assign_value(String application_name, String topic, String value){ + if (synchronized_map.containsKey(application_name)){ + synchronized_map.get(application_name).put(topic,value); + }else{ + synchronized_map.put(application_name,Collections.synchronizedMap(new HashMap<>())); + synchronized_map.get(application_name).put(topic,value); + } } } diff --git a/slo-violation-detector/src/main/resources/config/eu.nebulous.slo_violation_detector.properties b/slo-violation-detector/src/main/resources/config/eu.nebulous.slo_violation_detector.properties index 3f72bf3..89d63dd 100644 --- a/slo-violation-detector/src/main/resources/config/eu.nebulous.slo_violation_detector.properties +++ b/slo-violation-detector/src/main/resources/config/eu.nebulous.slo_violation_detector.properties @@ -11,5 +11,5 @@ broker_password = admin slo_violation_probability_threshold = 0.1 slo_violation_determination_method = prconf-delta -time_horizon_seconds = 120 +time_horizon_seconds = 5 maximum_acceptable_forward_predictions = 30 \ No newline at end of file diff --git a/slo-violation-detector/src/test/java/ConnectivityTests.java b/slo-violation-detector/src/test/java/ConnectivityTests.java index 2d185b3..0d5ebbc 100644 --- a/slo-violation-detector/src/test/java/ConnectivityTests.java +++ b/slo-violation-detector/src/test/java/ConnectivityTests.java @@ -46,7 +46,7 @@ public class ConnectivityTests { BrokerPublisher publisher = new BrokerPublisher("test_topic",prop.getProperty("broker_ip_url"),prop.getProperty("broker_username"),prop.getProperty("broker_password"), amq_library_configuration_location); - BrokerSubscriber subscriber = new BrokerSubscriber("test_topic",prop.getProperty("broker_ip_url"),prop.getProperty("broker_username"),prop.getProperty("broker_password"),amq_library_configuration_location); + BrokerSubscriber subscriber = new BrokerSubscriber("test_topic",prop.getProperty("broker_ip_url"),prop.getProperty("broker_username"),prop.getProperty("broker_password"),amq_library_configuration_location,default_application_name); JSONObject object_to_publish = new JSONObject(); object_to_publish.put("ram","95"); diff --git a/slo-violation-detector/src/test/java/DerivedMonitoringAttributeTests.java b/slo-violation-detector/src/test/java/DerivedMonitoringAttributeTests.java index cbd3102..fe08701 100644 --- a/slo-violation-detector/src/test/java/DerivedMonitoringAttributeTests.java +++ b/slo-violation-detector/src/test/java/DerivedMonitoringAttributeTests.java @@ -19,7 +19,6 @@ import static configuration.Constants.roc_limit; import static utility_beans.PredictedMonitoringAttribute.getAttributes_maximum_rate_of_change; import static utility_beans.PredictedMonitoringAttribute.getAttributes_minimum_rate_of_change; import static utility_beans.RealtimeMonitoringAttribute.simple_initialize_0_100_bounded_attributes; -import static utility_beans.RealtimeMonitoringAttribute.update_monitoring_attribute_value; public class DerivedMonitoringAttributeTests { @@ -28,7 +27,7 @@ public class DerivedMonitoringAttributeTests { public void roc_calculation_test(){ simple_initialize_0_100_bounded_attributes(detector, List.of("cpu")); - update_monitoring_attribute_value(detector,"cpu",0.0); + detector.update_monitoring_attribute_value("cpu",0.0); detector.getSubcomponent_state().getMonitoring_attributes_roc_statistics().put("cpu", new MonitoringAttributeStatistics()); //The rate of change of a metric, is a metric which itself should be monitored for its upper bound getAttributes_maximum_rate_of_change().put("cpu",roc_limit); diff --git a/slo-violation-detector/src/test/java/SeverityTests.java b/slo-violation-detector/src/test/java/SeverityTests.java index 20af440..d8f823f 100644 --- a/slo-violation-detector/src/test/java/SeverityTests.java +++ b/slo-violation-detector/src/test/java/SeverityTests.java @@ -36,7 +36,7 @@ public class SeverityTests { for(String monitoring_metric_name : metric_names) { detector.getSubcomponent_state().getMonitoring_attributes_roc_statistics().put(monitoring_metric_name, new MonitoringAttributeStatistics()); //The rate of change of a metric, is a metric which itself should be monitored for its upper bound } - RealtimeMonitoringAttribute.update_monitoring_attribute_value(detector,"cpu",0.0); + detector.update_monitoring_attribute_value("cpu",0.0); PredictedMonitoringAttribute prediction_attribute = new PredictedMonitoringAttribute(detector,"cpu",70,1,100.0,100,10,System.currentTimeMillis(),System.currentTimeMillis()+20000); @@ -61,7 +61,7 @@ public class SeverityTests { for(String monitoring_metric_name : metric_names) { detector.getSubcomponent_state().getMonitoring_attributes_roc_statistics().put(monitoring_metric_name, new MonitoringAttributeStatistics()); //The rate of change of a metric, is a metric which itself should be monitored for its upper bound } - RealtimeMonitoringAttribute.update_monitoring_attribute_value(detector,"cpu",30.0); + detector.update_monitoring_attribute_value("cpu",30.0); PredictedMonitoringAttribute prediction_attribute = new PredictedMonitoringAttribute(detector,"cpu",70,1,80.0,90,5,System.currentTimeMillis(),System.currentTimeMillis()+20000); @@ -86,7 +86,7 @@ public class SeverityTests { for(String monitoring_metric_name : metric_names) { detector.getSubcomponent_state().getMonitoring_attributes_roc_statistics().put(monitoring_metric_name, new MonitoringAttributeStatistics()); //The rate of change of a metric, is a metric which itself should be monitored for its upper bound } - RealtimeMonitoringAttribute.update_monitoring_attribute_value(detector,"cpu",86.0); + detector.update_monitoring_attribute_value("cpu",86.0); PredictedMonitoringAttribute prediction_attribute = new PredictedMonitoringAttribute(detector,"cpu",75,1,92.0,88,7.8,System.currentTimeMillis(),System.currentTimeMillis()+20000); diff --git a/slo-violation-detector/src/test/java/UnboundedMonitoringAttributeTests.java b/slo-violation-detector/src/test/java/UnboundedMonitoringAttributeTests.java index 73a8851..9f07203 100644 --- a/slo-violation-detector/src/test/java/UnboundedMonitoringAttributeTests.java +++ b/slo-violation-detector/src/test/java/UnboundedMonitoringAttributeTests.java @@ -44,7 +44,6 @@ import static slo_rule_modelling.SLORule.process_rule_value; import static slo_violation_detector_engine.detector.DetectorSubcomponentUtilities.initialize_subrule_and_attribute_associations; import static utility_beans.CharacterizedThread.CharacterizedThreadRunMode.detached; import static utility_beans.PredictedMonitoringAttribute.getPredicted_monitoring_attributes; -import static utility_beans.RealtimeMonitoringAttribute.update_monitoring_attribute_value; class MetricConfiguration{ public String name; @@ -117,11 +116,11 @@ public class UnboundedMonitoringAttributeTests { String realtime_metric_topic_name = TopicNames.realtime_metric_values_topic(metric_name); Logger.getGlobal().log(Level.INFO, "Starting realtime subscription at " + realtime_metric_topic_name); - BrokerSubscriber subscriber = new BrokerSubscriber(realtime_metric_topic_name, broker_ip_address, broker_username, broker_password, amq_library_configuration_location); + BrokerSubscriber subscriber = new BrokerSubscriber(realtime_metric_topic_name, broker_ip_address, broker_username, broker_password, amq_library_configuration_location,default_application_name); BiFunction function = (topic, message) -> { synchronized (detector.getSubcomponent_state().getMonitoring_attributes().get(topic)) { try { - update_monitoring_attribute_value(detector,topic, ((Number) ((JSONObject) new JSONParser().parse(message)).get("metricValue")).doubleValue()); + detector.update_monitoring_attribute_value(topic, ((Number) ((JSONObject) new JSONParser().parse(message)).get("metricValue")).doubleValue()); Logger.getGlobal().log(info_logging_level, "RECEIVED message with value for " + topic + " equal to " + (((JSONObject) new JSONParser().parse(message)).get("metricValue"))); } catch (ParseException e) { @@ -139,9 +138,8 @@ public class UnboundedMonitoringAttributeTests { String forecasted_metric_topic_name = TopicNames.final_metric_predictions_topic(metric_name); - BrokerSubscriber forecasted_subscriber = new BrokerSubscriber(forecasted_metric_topic_name, broker_ip_address,broker_username,broker_password, amq_library_configuration_location); - BiFunction forecasted_function = (topic,message) ->{ - String predicted_attribute_name = topic.replaceFirst("prediction\\.",EMPTY); + BrokerSubscriber forecasted_subscriber = new BrokerSubscriber(forecasted_metric_topic_name, broker_ip_address,broker_username,broker_password, amq_library_configuration_location,default_application_name); + BiFunction forecasted_function = (broker_details,message) ->{ HashMap> predicted_attributes = getPredicted_monitoring_attributes(); try { double forecasted_value = ((Number)((JSONObject)new JSONParser().parse(message)).get(EventFields.PredictionMetricEventFields.metricValue.name())).doubleValue(); @@ -151,7 +149,7 @@ public class UnboundedMonitoringAttributeTests { double confidence_interval = ((Number)json_array_confidence_interval.get(1)).doubleValue() - ((Number)json_array_confidence_interval.get(0)).doubleValue(); long timestamp = ((Number)((JSONObject)new JSONParser().parse(message)).get(EventFields.PredictionMetricEventFields.timestamp)).longValue(); long targeted_prediction_time = ((Number)((JSONObject)new JSONParser().parse(message)).get(EventFields.PredictionMetricEventFields.predictionTime.name())).longValue(); - Logger.getGlobal().log(info_logging_level,"RECEIVED message with predicted value for "+predicted_attribute_name+" equal to "+ forecasted_value); + Logger.getGlobal().log(info_logging_level,"RECEIVED message with predicted value for "+ metric_name +" equal to "+ forecasted_value); synchronized (detector.can_modify_slo_rules) { if(!detector.can_modify_slo_rules.getValue()) { @@ -168,12 +166,12 @@ public class UnboundedMonitoringAttributeTests { } } //predicted_attributes.get(predicted_attribute_name).clear(); - for (SLOSubRule subrule : SLOSubRule.getSlo_subrules_per_monitoring_attribute().get(predicted_attribute_name)) { + for (SLOSubRule subrule : SLOSubRule.getSlo_subrules_per_monitoring_attribute().get(metric_name)) { getPredicted_monitoring_attributes().computeIfAbsent(subrule.getId(), k -> new HashMap<>()); if ( (getPredicted_monitoring_attributes().get(subrule.getId()).get(targeted_prediction_time)!=null) &&(getPredicted_monitoring_attributes().get(subrule.getId()).get(targeted_prediction_time).getTimestamp()>timestamp)){ //do nothing, as in this case an older prediction has arrived for a metric delayed, and so it should be disregarded }else { - PredictedMonitoringAttribute prediction_attribute = new PredictedMonitoringAttribute(detector, predicted_attribute_name, subrule.getThreshold(), subrule.getId(), forecasted_value, probability_confidence, confidence_interval, timestamp,targeted_prediction_time); + PredictedMonitoringAttribute prediction_attribute = new PredictedMonitoringAttribute(detector, metric_name, subrule.getThreshold(), subrule.getId(), forecasted_value, probability_confidence, confidence_interval, timestamp,targeted_prediction_time); //predicted_attributes.get(predicted_attribute_name).add(prediction_attribute); subrule.setAssociated_predicted_monitoring_attribute(prediction_attribute); @@ -182,6 +180,7 @@ public class UnboundedMonitoringAttributeTests { } } detector.can_modify_slo_rules.setValue(true); + detector.can_modify_slo_rules.notifyAll(); } //SLOViolationCalculator.get_Severity_all_metrics_method(prediction_attribute)