Major code refactoring
Changes in the naming of the topics used for the communication of the component Better information propagation during the creation of a Broker subscriber processing function Better handling of upper and lower metric (attribute) bounds Acquisition of broker credential information from the user Improvement of number handling within metrics (allow a metric to have integer or real values) Improvement of the logic related to the initiation of a new SLO violation detection engine Transferral of generic topic subscriptions (application-independent logic) to the code of the Director Subcomponent Improvements to the CustomDataPublisher Better handling of paths for the properties files necessary for the operation of the component Improvement on the handling of infinity Change-Id: I94a6fdb4612de192c24511445f1236cdce94b366
This commit is contained in:
parent
a3e2ada8dc
commit
ac4d0744b5
@ -14,8 +14,8 @@ RUN mvn -f /home/app/pom.xml -DfailIfNoTests=false -Dtest=!UnboundedMonitoringAt
|
||||
|
||||
FROM docker.io/library/eclipse-temurin:17
|
||||
RUN mkdir -p /home/src/main/resources/
|
||||
COPY src/main/resources/config/eu.nebulous.slo_violation_detector.properties /home/src/main/resources/config/input_data.properties
|
||||
COPY src/main/resources/config/eu.melodic.event.brokerclient.properties /home/src/main/resources/config/eu.melodic.event.brokerclient.properties
|
||||
COPY src/main/resources/config/eu.nebulous.slo_violation_detector.properties /home/slo-violation-detector/src/main/resources/config/eu.nebulous.slo_violation_detector.properties
|
||||
COPY src/main/resources/config/eu.melodic.event.brokerclient.properties /home/slo-violation-detector/src/main/resources/config/eu.melodic.event.brokerclient.properties
|
||||
COPY --from=build /home/app/target/SLO-Violation-Detector-4.0-SNAPSHOT.jar /home/SLOSeverityCalculator-4.0-SNAPSHOT.jar
|
||||
WORKDIR /home
|
||||
ENV LOG_FILE /home/slo_violation_detector.log
|
||||
|
@ -27,18 +27,18 @@ public class Constants {
|
||||
public static URI base_project_path;
|
||||
public static String configuration_file_location = "slo-violation-detector/src/main/resources/config/eu.nebulous.slo_violation_detector.properties";
|
||||
public static String amq_library_configuration_location = "slo-violation-detector/src/main/resources/config/eu.melodic.event.brokerclient.properties";
|
||||
public static String topic_for_severity_announcement = "prediction.slo_severity_value";
|
||||
public static String topic_for_severity_announcement = "eu.nebulouscloud.monitoring.slo.severity_value";
|
||||
public static String topic_for_lost_device_announcement = "eu.nebulouscloud.device_lost";
|
||||
public static String slo_rules_topic = "eu.nebulouscloud.monitoring.slo.new";
|
||||
public static String metric_list_topic = "eu.nebulouscloud.monitoring.metric_list";
|
||||
public static String realtime_metrics_topic = "eu.nebulouscloud.monitoring.realtime.";
|
||||
public static String final_metric_prediction_topic = "eu.nebulouscloud.monitoring.predicted.";
|
||||
public static String topic_prefix_realtime_metrics = "eu.nebulouscloud.monitoring.realtime.";
|
||||
public static String topic_prefix_final_predicted_metrics = "eu.nebulouscloud.monitoring.predicted.";
|
||||
public static String nebulous_components_application = "nebulous_components_application";
|
||||
public static double slo_violation_probability_threshold = 0.5; //The threshold over which the probability of a predicted slo violation should be to have a violation detection
|
||||
public static int kept_values_per_metric = 5; //Default to be overriden from the configuration file. This indicates how many metric values are kept to calculate the "previous" metric value during the rate of change calculation
|
||||
public static String roc_calculation_mode = "prototype";
|
||||
public static boolean self_publish_rule_file = false; //default value to be overriden
|
||||
public static boolean single_slo_rule_active = true; //default value to be overriden
|
||||
public static boolean first_run = true;
|
||||
public static double roc_limit = 1;
|
||||
public static double epsilon = 0.00000000001;
|
||||
public static Level debug_logging_level = Level.OFF;
|
||||
|
@ -12,7 +12,7 @@ package metric_retrieval;
|
||||
//import eu.melodic.event.brokerclient.templates.EventFields;
|
||||
//import eu.melodic.event.brokerclient.templates.TopicNames;
|
||||
import slo_violation_detector_engine.detector.DetectorSubcomponent;
|
||||
import utility_beans.BrokerSubscriber;
|
||||
import utility_beans.*;
|
||||
import utility_beans.BrokerSubscriber.EventFields;
|
||||
import utility_beans.BrokerSubscriber.TopicNames;
|
||||
import org.json.simple.JSONArray;
|
||||
@ -21,9 +21,6 @@ import org.json.simple.parser.JSONParser;
|
||||
import org.json.simple.parser.ParseException;
|
||||
import slo_rule_modelling.SLORule;
|
||||
import slo_rule_modelling.SLOSubRule;
|
||||
import utility_beans.CharacterizedThread;
|
||||
import utility_beans.PredictedMonitoringAttribute;
|
||||
import utility_beans.RealtimeMonitoringAttribute;
|
||||
|
||||
import java.time.Clock;
|
||||
import java.util.HashMap;
|
||||
@ -33,7 +30,6 @@ import java.util.logging.Logger;
|
||||
|
||||
import static configuration.Constants.*;
|
||||
import static utility_beans.PredictedMonitoringAttribute.getPredicted_monitoring_attributes;
|
||||
import static utility_beans.RealtimeMonitoringAttribute.update_monitoring_attribute_value;
|
||||
|
||||
public class AttributeSubscription {
|
||||
SLORule slo_rule;
|
||||
@ -45,15 +41,13 @@ public class AttributeSubscription {
|
||||
|
||||
String realtime_metric_topic_name = TopicNames.realtime_metric_values_topic(metric);
|
||||
Logger.getGlobal().log(info_logging_level,"Starting realtime subscription at "+realtime_metric_topic_name);
|
||||
BrokerSubscriber subscriber = new BrokerSubscriber(realtime_metric_topic_name, broker_ip_address,broker_username,broker_password, amq_library_configuration_location);
|
||||
BiFunction<String,String,String> function = (topic, message) ->{
|
||||
RealtimeMonitoringAttribute realtimeMonitoringAttribute = new RealtimeMonitoringAttribute(topic);
|
||||
String metric_name_from_topic = topic.replace("eu.nebulouscloud.monitoring.realtime.",EMPTY);
|
||||
synchronized (detector.getSubcomponent_state().getMonitoring_attributes().get(metric_name_from_topic)) {
|
||||
BrokerSubscriber subscriber = new BrokerSubscriber(realtime_metric_topic_name, broker_ip_address,broker_username,broker_password, amq_library_configuration_location,detector.get_application_name());
|
||||
BiFunction<BrokerSubscriptionDetails,String,String> function = (broker_details, message) ->{
|
||||
synchronized (detector.getSubcomponent_state().getMonitoring_attributes().get(metric)) {
|
||||
try {
|
||||
update_monitoring_attribute_value(detector,metric_name_from_topic,((Number)((JSONObject)new JSONParser().parse(message)).get("metricValue")).doubleValue());
|
||||
detector.update_monitoring_attribute_value(metric,((Number)((JSONObject)new JSONParser().parse(message)).get("metricValue")).doubleValue());
|
||||
|
||||
Logger.getGlobal().log(info_logging_level,"RECEIVED message with value for "+metric_name_from_topic+" equal to "+(((JSONObject)new JSONParser().parse(message)).get("metricValue")));
|
||||
Logger.getGlobal().log(info_logging_level,"RECEIVED message with value for "+metric+" equal to "+(((JSONObject)new JSONParser().parse(message)).get("metricValue")));
|
||||
} catch (ParseException e) {
|
||||
e.printStackTrace();
|
||||
Logger.getGlobal().log(info_logging_level,"A parsing exception was caught while parsing message: "+message);
|
||||
@ -86,10 +80,10 @@ public class AttributeSubscription {
|
||||
|
||||
String forecasted_metric_topic_name = TopicNames.final_metric_predictions_topic(metric);
|
||||
Logger.getGlobal().log(info_logging_level,"Starting forecasted metric subscription at "+forecasted_metric_topic_name);
|
||||
BrokerSubscriber forecasted_subscriber = new BrokerSubscriber(forecasted_metric_topic_name, broker_ip_address,broker_username,broker_password, amq_library_configuration_location);
|
||||
BrokerSubscriber forecasted_subscriber = new BrokerSubscriber(forecasted_metric_topic_name, broker_ip_address,broker_username,broker_password, amq_library_configuration_location,detector.get_application_name());
|
||||
|
||||
BiFunction<String,String,String> forecasted_function = (topic,message) ->{
|
||||
String predicted_attribute_name = topic.replaceFirst("eu\\.nebulouscloud\\.monitoring\\.predicted\\.",EMPTY);
|
||||
BiFunction<BrokerSubscriptionDetails,String,String> forecasted_function = (broker_details,message) ->{
|
||||
String predicted_attribute_name = forecasted_metric_topic_name.replaceFirst("eu\\.nebulouscloud\\.monitoring\\.predicted\\.",EMPTY);
|
||||
HashMap<Integer, HashMap<Long,PredictedMonitoringAttribute>> predicted_attributes = getPredicted_monitoring_attributes();
|
||||
try {
|
||||
JSONObject json_message = (JSONObject)(new JSONParser().parse(message));
|
||||
@ -122,7 +116,7 @@ public class AttributeSubscription {
|
||||
}
|
||||
detector.ADAPTATION_TIMES_MODIFY.setValue(false);
|
||||
if (!detector.getSubcomponent_state().adaptation_times.contains(targeted_prediction_time) && (!detector.getSubcomponent_state().adaptation_times_pending_processing.contains(targeted_prediction_time)) && ((targeted_prediction_time * 1000 - time_horizon_seconds * 1000L) > (Clock.systemUTC()).millis())) {
|
||||
Logger.getGlobal().log(info_logging_level, "Adding a new targeted prediction time " + targeted_prediction_time + " expiring in "+(targeted_prediction_time*1000-System.currentTimeMillis())+" from topic "+topic);
|
||||
Logger.getGlobal().log(info_logging_level, "Adding a new targeted prediction time " + targeted_prediction_time + " expiring in "+(targeted_prediction_time*1000-System.currentTimeMillis())+" from topic "+forecasted_metric_topic_name);
|
||||
detector.getSubcomponent_state().adaptation_times.add(targeted_prediction_time);
|
||||
synchronized (detector.PREDICTION_EXISTS) {
|
||||
detector.PREDICTION_EXISTS.setValue(true);
|
||||
@ -130,10 +124,10 @@ public class AttributeSubscription {
|
||||
}
|
||||
}else {
|
||||
if (detector.getSubcomponent_state().adaptation_times.contains(targeted_prediction_time)) {
|
||||
Logger.getGlobal().log(info_logging_level, "Could not add the new targeted prediction time " + targeted_prediction_time + " from topic " + topic + " as it is already present");
|
||||
Logger.getGlobal().log(info_logging_level, "Could not add the new targeted prediction time " + targeted_prediction_time + " from topic " + forecasted_metric_topic_name + " as it is already present");
|
||||
} else if (!detector.getSubcomponent_state().adaptation_times_pending_processing.contains(targeted_prediction_time)) {
|
||||
if (targeted_prediction_time * 1000 - time_horizon_seconds * 1000L - (Clock.systemUTC()).millis() <= 0) {
|
||||
Logger.getGlobal().log(info_logging_level, "Could not add the new targeted prediction time " + targeted_prediction_time + " from topic " + topic + " as it would expire in " + (targeted_prediction_time * 1000 - System.currentTimeMillis()) + " milliseconds and the prediction horizon is " + time_horizon_seconds * 1000L + " milliseconds");
|
||||
Logger.getGlobal().log(info_logging_level, "Could not add the new targeted prediction time " + targeted_prediction_time + " from topic " + forecasted_metric_topic_name + " as it would expire in " + (targeted_prediction_time * 1000 - System.currentTimeMillis()) + " milliseconds and the prediction horizon is " + time_horizon_seconds * 1000L + " milliseconds");
|
||||
}else{
|
||||
Logger.getGlobal().log(info_logging_level,"Adding new prediction time "+targeted_prediction_time+" which expires in " + (targeted_prediction_time * 1000 - System.currentTimeMillis()));
|
||||
detector.getSubcomponent_state().adaptation_times_pending_processing.add(targeted_prediction_time);
|
||||
|
@ -9,7 +9,7 @@ import static configuration.Constants.default_application_name;
|
||||
import static runtime.Main.detectors;
|
||||
import static slo_violation_detector_engine.detector.DetectorSubcomponent.detector_integer_id;
|
||||
import static slo_violation_detector_engine.detector.DetectorSubcomponent.detector_subcomponents;
|
||||
import static utilities.DebugDataSubscription.debug_data_generation;
|
||||
import static utilities.DebugDataSubscription.*;
|
||||
import static utility_beans.CharacterizedThread.CharacterizedThreadRunMode.detached;
|
||||
@RestController
|
||||
@RequestMapping("/api")
|
||||
@ -17,7 +17,7 @@ public class DetectorRequestMappings {
|
||||
|
||||
@RequestMapping("/add-new-detector")
|
||||
public static String start_new_detector_subcomponent() {
|
||||
detectors.add(new DetectorSubcomponent(default_application_name,detached));
|
||||
detectors.put(default_application_name,new DetectorSubcomponent(default_application_name,detached));
|
||||
return ("Spawned new SLO Detector subcomponent instance! Currently, there have been "+detector_integer_id+" detectors spawned");
|
||||
}
|
||||
|
||||
@ -30,7 +30,7 @@ public class DetectorRequestMappings {
|
||||
@GetMapping("/component-statistics/detectors/{id}")
|
||||
public static String get_detector_subcomponent_statistics(@PathVariable String id) {
|
||||
String detector_name = "detector_"+id;
|
||||
debug_data_generation.apply(detector_subcomponents.get(detector_name).getBrokerSubscriptionDetails(),EMPTY);
|
||||
debug_data_generation.apply(detector_subcomponents.get(detector_name).getBrokerSubscriptionDetails(debug_data_trigger_topic_name),EMPTY);
|
||||
return DetectorSubcomponent.get_detector_subcomponent_statistics();
|
||||
}
|
||||
}
|
||||
|
@ -10,13 +10,16 @@ import org.springframework.web.bind.annotation.RequestBody;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
import slo_violation_detector_engine.detector.DetectorSubcomponent;
|
||||
import slo_violation_detector_engine.director.DirectorSubcomponent;
|
||||
import utility_beans.BrokerSubscriptionDetails;
|
||||
import utility_beans.CharacterizedThread;
|
||||
import utility_beans.RealtimeMonitoringAttribute;
|
||||
|
||||
import java.util.HashMap;
|
||||
|
||||
import static configuration.Constants.default_application_name;
|
||||
import static configuration.Constants.*;
|
||||
import static org.springframework.http.MediaType.APPLICATION_JSON_VALUE;
|
||||
import static slo_violation_detector_engine.generic.ComponentState.*;
|
||||
|
||||
@RestController
|
||||
@RequestMapping("/api")
|
||||
@ -32,9 +35,8 @@ public class DirectorRequestMappings {
|
||||
} catch (ParseException e) {
|
||||
return "Error in parsing the input string, the exception message follows:\n"+e;
|
||||
}
|
||||
application_name = (String) rule_representation_json.get("name");
|
||||
DetectorSubcomponent new_detector = DetectorSubcomponent.detector_subcomponents.getOrDefault(application_name,new DetectorSubcomponent(default_application_name,CharacterizedThread.CharacterizedThreadRunMode.detached));
|
||||
new_detector.slo_rule_topic_subscriber_function.apply(Constants.slo_rules_topic,string_rule_representation);
|
||||
BrokerSubscriptionDetails broker_details = new BrokerSubscriptionDetails(broker_ip,broker_username,broker_password,nebulous_components_application,slo_rules_topic);
|
||||
DirectorSubcomponent.slo_rule_topic_subscriber_function.apply(broker_details,string_rule_representation);
|
||||
return ("New application was spawned");
|
||||
}
|
||||
|
||||
@ -58,19 +60,39 @@ public class DirectorRequestMappings {
|
||||
for (Object metric : metrics_json_array){
|
||||
JSONObject metric_json = (JSONObject) metric;
|
||||
String metric_name = (String) metric_json.get("name");
|
||||
|
||||
RealtimeMonitoringAttribute.AttributeValuesType lower_bound_type,upper_bound_type;
|
||||
|
||||
double upper_bound = 100.0,lower_bound = 0.0;
|
||||
if (((String) metric_json.get("upper_bound")).toLowerCase().contains("-inf")){
|
||||
if (((String) metric_json.get("upper_bound")).toLowerCase().contains("-inf") || ((String) metric_json.get("upper_bound")).toLowerCase().contains("-infinity")){
|
||||
upper_bound = Double.NEGATIVE_INFINITY;
|
||||
}else if (((String) metric_json.get("upper_bound")).toLowerCase().contains("inf")){
|
||||
}else if (((String) metric_json.get("upper_bound")).toLowerCase().contains("inf") || ((String) metric_json.get("upper_bound")).toLowerCase().contains("infinity")){
|
||||
upper_bound = Double.NEGATIVE_INFINITY;
|
||||
}else{
|
||||
String upper_bound_str = (String) metric_json.get("upper_bound");
|
||||
try {
|
||||
upper_bound = Integer.parseInt(upper_bound_str);
|
||||
upper_bound_type = RealtimeMonitoringAttribute.AttributeValuesType.Integer;
|
||||
}catch (Exception e){
|
||||
try{
|
||||
upper_bound = Double.parseDouble(upper_bound_str);
|
||||
upper_bound_type = RealtimeMonitoringAttribute.AttributeValuesType.Double;
|
||||
}catch (Exception z){
|
||||
e.printStackTrace();
|
||||
z.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
if (((String) metric_json.get("lower_bound")).toLowerCase().contains("-inf")){
|
||||
|
||||
|
||||
if (((String) metric_json.get("lower_bound")).toLowerCase().contains("-inf") || ((String) metric_json.get("lower_bound")).toLowerCase().contains("-infinity")){
|
||||
lower_bound = Double.POSITIVE_INFINITY;
|
||||
}
|
||||
else if (((String) metric_json.get("lower_bound")).toLowerCase().contains("inf")){
|
||||
else if (((String) metric_json.get("lower_bound")).toLowerCase().contains("inf") || ((String) metric_json.get("lower_bound")).toLowerCase().contains("infinity")){
|
||||
lower_bound = Double.POSITIVE_INFINITY;
|
||||
}else {
|
||||
application_metrics.put(metric_name, new RealtimeMonitoringAttribute(metric_name, lower_bound, upper_bound, RealtimeMonitoringAttribute.AttributeValuesType.Double));
|
||||
}
|
||||
application_metrics.put(metric_name,new RealtimeMonitoringAttribute(metric_name,lower_bound,upper_bound));
|
||||
}
|
||||
|
||||
RealtimeMonitoringAttribute.initialize_monitoring_attributes(new_detector,application_metrics);
|
||||
|
@ -14,6 +14,7 @@ import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import slo_violation_detector_engine.detector.DetectorSubcomponent;
|
||||
import slo_violation_detector_engine.director.DirectorSubcomponent;
|
||||
import slo_violation_detector_engine.generic.ComponentState.*;
|
||||
import utility_beans.*;
|
||||
|
||||
import java.io.IOException;
|
||||
@ -24,6 +25,7 @@ import java.util.logging.Logger;
|
||||
import static configuration.Constants.*;
|
||||
|
||||
import static java.util.logging.Level.INFO;
|
||||
import static slo_violation_detector_engine.generic.ComponentState.*;
|
||||
import static utilities.OperationalModeUtils.getSLOViolationDetectionOperationalMode;
|
||||
import static slo_violation_detector_engine.generic.SLOViolationDetectorStateUtils.*;
|
||||
import static utilities.OperationalModeUtils.get_director_subscription_topics;
|
||||
@ -33,7 +35,7 @@ import static utility_beans.CharacterizedThread.CharacterizedThreadRunMode.detac
|
||||
@SpringBootApplication
|
||||
public class Main {
|
||||
public static Long current_slo_rules_version = -1L;//initialization
|
||||
public static ArrayList<DetectorSubcomponent> detectors = new ArrayList<>();
|
||||
public static HashMap<String,DetectorSubcomponent> detectors = new HashMap<>();
|
||||
public static void main(String[] args) {
|
||||
|
||||
//The input of this program (the SLO Violation Detector) is the type of the SLO violations which are monitored, and the predicted metric values which are evaluated. Its output are events which produce an estimate of the probability of an adaptation.
|
||||
@ -72,18 +74,23 @@ public class Main {
|
||||
slo_violation_probability_threshold = Double.parseDouble(prop.getProperty("slo_violation_probability_threshold"));
|
||||
slo_violation_determination_method = prop.getProperty("slo_violation_determination_method");
|
||||
maximum_acceptable_forward_predictions = Integer.parseInt(prop.getProperty("maximum_acceptable_forward_predictions"));
|
||||
|
||||
broker_ip = prop.getProperty("broker_ip_url");
|
||||
broker_username = prop.getProperty("broker_username");
|
||||
broker_password = prop.getProperty("broker_password");
|
||||
|
||||
//director_subscription_topics = get_director_subscription_topics();
|
||||
DetectorSubcomponent detector = new DetectorSubcomponent(default_application_name,detached);
|
||||
detectors.add(detector);
|
||||
detectors.put(default_application_name,detector);
|
||||
ArrayList<String> unbounded_metric_strings = new ArrayList<>(Arrays.asList(prop.getProperty("metrics_bounds").split(",")));
|
||||
for (String metric_string : unbounded_metric_strings) {
|
||||
detector.getSubcomponent_state().getMonitoring_attributes_bounds_representation().put(metric_string.split(";")[0], metric_string.split(";", 2)[1]); //TODO delete once this information is successfully received from the AMQP broker
|
||||
}
|
||||
} //initialization
|
||||
if (operational_mode.equals(OperationalMode.DETECTOR)) {
|
||||
Logger.getGlobal().log(INFO,"Starting new Detector instance"); //This detector instance has been already started in the initialization block above as it will be commonly needed both for the plain Detector and the Director-Detector
|
||||
Logger.getGlobal().log(INFO,"Started new Detector instance"); //This detector instance has been already started in the initialization block above as it will be commonly needed both for the plain Detector and the Director-Detector
|
||||
}else if (operational_mode.equals(OperationalMode.DIRECTOR)){
|
||||
Logger.getGlobal().log(INFO,"Starting new Director and new Detector instance");
|
||||
Logger.getGlobal().log(INFO,"Starting new Director along the new Detector instance");
|
||||
DirectorSubcomponent director = new DirectorSubcomponent();
|
||||
SpringApplication.run(Main.class, args);
|
||||
Logger.getGlobal().log(INFO,"Execution completed");
|
||||
|
@ -20,6 +20,7 @@ import utility_beans.PredictedMonitoringAttribute;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
@ -63,7 +64,7 @@ public class SLORule {
|
||||
for (String metric: metric_list) {
|
||||
RealtimeMonitoringAttribute monitoring_attribute;
|
||||
if (!associated_detector.getSubcomponent_state().getMonitoring_attributes().containsKey(metric)){
|
||||
monitoring_attribute = new RealtimeMonitoringAttribute(metric);
|
||||
monitoring_attribute = new RealtimeMonitoringAttribute(metric,false, RealtimeMonitoringAttribute.AttributeValuesType.Unknown);
|
||||
associated_detector.getSubcomponent_state().getMonitoring_attributes().put(metric,monitoring_attribute);
|
||||
}
|
||||
monitoring_attributes.add(metric);
|
||||
@ -199,7 +200,7 @@ public class SLORule {
|
||||
//subrules_json_array.add
|
||||
}
|
||||
if (composite_rule){
|
||||
ArrayList<Double> individual_severity_contributions = new ArrayList<>();
|
||||
ArrayList<Number> individual_severity_contributions = new ArrayList<>();
|
||||
boolean and_subrules_invalidated = false;
|
||||
for (Object subrule: subrules_json_array) {
|
||||
JSONObject json_subrule = (JSONObject) subrule;
|
||||
@ -239,7 +240,7 @@ public class SLORule {
|
||||
rule_result_value = MathUtils.get_average(individual_severity_contributions);
|
||||
calculation_logging_string.append("Calculating average of individual severity contributions: ").append(individual_severity_contributions).append(" equals ").append(rule_result_value).append("\n");
|
||||
}else if (slo_violation_determination_method.equals("prconf-delta") && individual_severity_contributions.size()>0){
|
||||
rule_result_value = Math.sqrt(MathUtils.sum(individual_severity_contributions.stream().map(x->x*x).collect(Collectors.toList())))/Math.sqrt(individual_severity_contributions.size());
|
||||
rule_result_value = Math.sqrt(MathUtils.sum(individual_severity_contributions.stream().map(x->x.doubleValue()*x.doubleValue()).collect(Collectors.toList())))/Math.sqrt(individual_severity_contributions.size());
|
||||
calculation_logging_string.append("Calculating square root of sum of individual severity contributions: ").append(individual_severity_contributions).append(" - the result is ").append(rule_result_value).append("\n");
|
||||
|
||||
}
|
||||
|
@ -1,27 +1,26 @@
|
||||
package slo_violation_detector_engine.detector;
|
||||
|
||||
import org.json.simple.JSONArray;
|
||||
import org.json.simple.JSONObject;
|
||||
import org.json.simple.parser.JSONParser;
|
||||
import org.apache.commons.collections4.queue.CircularFifoQueue;
|
||||
import slo_violation_detector_engine.generic.Runnables;
|
||||
import slo_violation_detector_engine.generic.SLOViolationDetectorSubcomponent;
|
||||
import utility_beans.*;
|
||||
|
||||
|
||||
import java.time.Clock;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import static configuration.Constants.*;
|
||||
import static slo_violation_detector_engine.generic.SLOViolationDetectorStateUtils.*;
|
||||
import static slo_violation_detector_engine.generic.ComponentState.prop;
|
||||
import static utility_beans.CharacterizedThread.CharacterizedThreadRunMode.attached;
|
||||
import static utility_beans.RealtimeMonitoringAttribute.aggregate_metric_values;
|
||||
|
||||
|
||||
public class DetectorSubcomponent extends SLOViolationDetectorSubcomponent {
|
||||
public static final SynchronizedInteger detector_integer_id = new SynchronizedInteger();
|
||||
public static HashMap<String,DetectorSubcomponent> detector_subcomponents = new HashMap<>(); //A HashMap containing all detector subcomponents
|
||||
public static Map<String,DetectorSubcomponent> detector_subcomponents = Collections.synchronizedMap(new HashMap<>()); //A HashMap containing all detector subcomponents
|
||||
private DetectorSubcomponentState subcomponent_state;
|
||||
public final AtomicBoolean stop_signal = new AtomicBoolean(false);
|
||||
public final SynchronizedBoolean can_modify_slo_rules = new SynchronizedBoolean(false);
|
||||
@ -35,6 +34,7 @@ public class DetectorSubcomponent extends SLOViolationDetectorSubcomponent {
|
||||
|
||||
public Long last_processed_adaptation_time = -1L;//initialization
|
||||
private String detector_name;
|
||||
private String handled_application_name;
|
||||
private static String broker_ip = prop.getProperty("broker_ip_url");
|
||||
private static String broker_username = prop.getProperty("broker_username");
|
||||
private static String broker_password = prop.getProperty("broker_password");
|
||||
@ -53,7 +53,8 @@ public class DetectorSubcomponent extends SLOViolationDetectorSubcomponent {
|
||||
detector_integer_id.setValue(detector_integer_id.getValue()+1);
|
||||
current_detector_id = detector_integer_id.getValue();
|
||||
//detector_integer_id.notify();
|
||||
detector_name = "detector_"+current_detector_id;
|
||||
handled_application_name = application_name;
|
||||
detector_name = "detector_"+application_name+"_"+current_detector_id;
|
||||
}
|
||||
if (characterized_thread_run_mode.equals(attached)) {
|
||||
DetectorSubcomponentUtilities.run_slo_violation_detection_engine(this);
|
||||
@ -63,75 +64,43 @@ public class DetectorSubcomponent extends SLOViolationDetectorSubcomponent {
|
||||
detector_subcomponents.put(detector_name,this);
|
||||
}
|
||||
|
||||
public BiFunction<String, String, String> slo_rule_topic_subscriber_function = (topic, message) -> {
|
||||
synchronized (can_modify_slo_rules) {
|
||||
can_modify_slo_rules.setValue(true);
|
||||
MESSAGE_CONTENTS.assign_value(topic, message);
|
||||
slo_rule_arrived.set(true);
|
||||
can_modify_slo_rules.notifyAll();
|
||||
public void update_monitoring_attribute_value(String name,Number value){
|
||||
if(getSubcomponent_state().getMonitoring_attributes().get(name)==null){
|
||||
|
||||
Logger.getGlobal().log(info_logging_level, "BrokerClientApp: - Received text message: " + message + " at topic " + topic);
|
||||
|
||||
}
|
||||
return topic + ":MSG:" + message;
|
||||
};
|
||||
|
||||
public BiFunction<String, String, String> metric_list_subscriber_function = (topic, message) -> {
|
||||
synchronized (can_modify_monitoring_metrics) {
|
||||
can_modify_monitoring_metrics.setValue(true);
|
||||
MESSAGE_CONTENTS.assign_value(topic, message);
|
||||
//TODO add monitoring metrics bounds
|
||||
String metric_name;
|
||||
double lower_bound,upper_bound;
|
||||
JSONParser parser = new JSONParser();
|
||||
JSONObject metric_list_object;
|
||||
try {
|
||||
metric_list_object = (JSONObject) parser.parse(message);
|
||||
for (Object element : (JSONArray) metric_list_object.get("metric_list")){
|
||||
metric_name = (String)((JSONObject)element).get("name");
|
||||
String lower_bound_str = (String)((JSONObject)element).get("lower_bound");
|
||||
String upper_bound_str = (String)((JSONObject)element).get("upper_bound");
|
||||
if (!(lower_bound_str.toLowerCase().equals("-inf") || lower_bound_str.toLowerCase().equals("-infinity"))){
|
||||
lower_bound = Double.parseDouble(lower_bound_str);
|
||||
}else{
|
||||
lower_bound = Double.NEGATIVE_INFINITY;
|
||||
}
|
||||
|
||||
if (!(upper_bound_str.toLowerCase().equals("inf") || upper_bound_str.toLowerCase().equals("infinity"))){
|
||||
upper_bound = Double.parseDouble(upper_bound_str);
|
||||
}else{
|
||||
upper_bound = Double.POSITIVE_INFINITY;
|
||||
}
|
||||
|
||||
subcomponent_state.getMonitoring_attributes().put(metric_name,new RealtimeMonitoringAttribute(metric_name,lower_bound,upper_bound));
|
||||
}
|
||||
}catch (Exception e){
|
||||
e.printStackTrace();
|
||||
RealtimeMonitoringAttribute.AttributeValuesType attribute_type;
|
||||
if (value instanceof Integer){
|
||||
attribute_type = RealtimeMonitoringAttribute.AttributeValuesType.Integer;
|
||||
}else if (value instanceof Double){
|
||||
attribute_type = RealtimeMonitoringAttribute.AttributeValuesType.Double;
|
||||
}else{
|
||||
attribute_type = RealtimeMonitoringAttribute.AttributeValuesType.Unknown;
|
||||
}
|
||||
getSubcomponent_state().getMonitoring_attributes().put(name,new RealtimeMonitoringAttribute(name,false,attribute_type));
|
||||
//monitoring_attributes_max_values.put(name,value);
|
||||
//monitoring_attributes_min_values.put(name,value);
|
||||
|
||||
//slo_rule_arrived.set(true);
|
||||
can_modify_monitoring_metrics.notifyAll();
|
||||
|
||||
Logger.getGlobal().log(info_logging_level, "BrokerClientApp: - Received text message: " + message + " at topic " + topic);
|
||||
|
||||
}else if (getSubcomponent_state().getMonitoring_attributes().get(name).getType()==RealtimeMonitoringAttribute.AttributeValuesType.Unknown){
|
||||
RealtimeMonitoringAttribute.AttributeValuesType attribute_type;
|
||||
if (value instanceof Integer){
|
||||
attribute_type = RealtimeMonitoringAttribute.AttributeValuesType.Integer;
|
||||
}else if (value instanceof Double){
|
||||
attribute_type = RealtimeMonitoringAttribute.AttributeValuesType.Double;
|
||||
}else{
|
||||
attribute_type = RealtimeMonitoringAttribute.AttributeValuesType.Unknown;
|
||||
}
|
||||
getSubcomponent_state().getMonitoring_attributes().get(name).setType(attribute_type);
|
||||
}
|
||||
return "Monitoring metrics message processed";
|
||||
};
|
||||
public static BrokerSubscriber device_lost_subscriber = new BrokerSubscriber(topic_for_lost_device_announcement, broker_ip, broker_username, broker_password, amq_library_configuration_location);
|
||||
public static BiFunction<String, String, String> device_lost_subscriber_function = (topic, message) -> {
|
||||
BrokerPublisher persistent_publisher = new BrokerPublisher(topic_for_severity_announcement, broker_ip, broker_username, broker_password, amq_library_configuration_location);
|
||||
|
||||
Clock clock = Clock.systemUTC();
|
||||
Long current_time_seconds = (long) Math.floor(clock.millis()/1000.0);
|
||||
JSONObject severity_json = new JSONObject();
|
||||
severity_json.put("severity", 100);
|
||||
severity_json.put("probability", 100);
|
||||
severity_json.put("predictionTime", current_time_seconds);
|
||||
persistent_publisher.publish(severity_json.toJSONString());
|
||||
|
||||
//TODO possibly necessary to remove the next adaptation time as it will probably not be possible to start an adaptation during it
|
||||
return topic + ":MSG:" + message;
|
||||
};
|
||||
getSubcomponent_state().getMonitoring_attributes().get(name).getActual_metric_values().add(value);
|
||||
getSubcomponent_state().getMonitoring_attributes_statistics().get(name).update_attribute_statistics(value);
|
||||
/*
|
||||
if(get_90th_percentile_high_value(name,value)>monitoring_attributes_max_values.get(name)){
|
||||
monitoring_attributes_max_values.put(name,value);
|
||||
}else if (get_90th_percentile_low_value(name,value)<monitoring_attributes_min_values.get(name)){
|
||||
monitoring_attributes_min_values.put(name,value);
|
||||
}
|
||||
*/
|
||||
}
|
||||
public static String get_detector_subcomponent_statistics() {
|
||||
return "Currently, the number of active detectors are "+detector_integer_id;
|
||||
}
|
||||
@ -153,7 +122,38 @@ public class DetectorSubcomponent extends SLOViolationDetectorSubcomponent {
|
||||
return detector_name;
|
||||
}
|
||||
|
||||
public BrokerSubscriptionDetails getBrokerSubscriptionDetails() {
|
||||
return new BrokerSubscriptionDetails(broker_ip,broker_username,broker_password);
|
||||
public void set_name(String name){
|
||||
detector_name = name;
|
||||
}
|
||||
|
||||
public String get_application_name(){
|
||||
return handled_application_name;
|
||||
}
|
||||
|
||||
public BrokerSubscriptionDetails getBrokerSubscriptionDetails(String topic) {
|
||||
return new BrokerSubscriptionDetails(broker_ip,broker_username,broker_password,handled_application_name,topic);
|
||||
}
|
||||
|
||||
public double get_metric_value(String metric_name){
|
||||
CircularFifoQueue<Number> actual_metric_values = getSubcomponent_state().getMonitoring_attributes().get(metric_name).getActual_metric_values();
|
||||
if (actual_metric_values.size()==0){
|
||||
Logger.getGlobal().log(warning_logging_level,"Trying to retrieve realtime values from an empty queue for metric "+metric_name);
|
||||
}
|
||||
return aggregate_metric_values(actual_metric_values);
|
||||
}
|
||||
|
||||
public static DetectorSubcomponent get_associated_detector(String application_name){
|
||||
DetectorSubcomponent associated_detector = detector_subcomponents.get(application_name);
|
||||
if (associated_detector==null){
|
||||
if (detector_subcomponents.size()==1 && detector_subcomponents.get(default_application_name)!=null){//This means only the initial 'default' application exists
|
||||
associated_detector = detector_subcomponents.get(default_application_name);
|
||||
associated_detector.set_name(application_name);
|
||||
}
|
||||
else {
|
||||
associated_detector = new DetectorSubcomponent(application_name, CharacterizedThread.CharacterizedThreadRunMode.detached);
|
||||
}
|
||||
}
|
||||
return associated_detector;
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -11,7 +11,7 @@ import java.util.HashSet;
|
||||
|
||||
import static configuration.Constants.kept_values_per_metric;
|
||||
|
||||
public class DetectorSubcomponentState {
|
||||
public class DetectorSubcomponentState{
|
||||
private HashMap<String, MonitoringAttributeStatistics> monitoring_attributes_statistics = new HashMap<>();
|
||||
private HashMap<String, MonitoringAttributeStatistics> monitoring_attributes_roc_statistics = new HashMap<>();
|
||||
private HashMap<String, RealtimeMonitoringAttribute> monitoring_attributes = new HashMap<>();
|
||||
|
@ -1,5 +1,6 @@
|
||||
package slo_violation_detector_engine.detector;
|
||||
|
||||
import groovy.util.logging.Log;
|
||||
import metric_retrieval.AttributeSubscription;
|
||||
import org.json.simple.JSONArray;
|
||||
import org.json.simple.JSONObject;
|
||||
@ -23,8 +24,8 @@ import java.util.logging.Logger;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static configuration.Constants.*;
|
||||
import static slo_violation_detector_engine.detector.DetectorSubcomponent.device_lost_subscriber;
|
||||
import static slo_violation_detector_engine.detector.DetectorSubcomponent.device_lost_subscriber_function;
|
||||
import static slo_violation_detector_engine.director.DirectorSubcomponent.MESSAGE_CONTENTS;
|
||||
import static slo_violation_detector_engine.generic.ComponentState.prop;
|
||||
import static slo_violation_detector_engine.generic.Runnables.get_severity_calculation_runnable;
|
||||
import static runtime.Main.*;
|
||||
import static slo_violation_detector_engine.generic.SLOViolationDetectorStateUtils.*;
|
||||
@ -261,105 +262,18 @@ public class DetectorSubcomponentUtilities {
|
||||
|
||||
public static void run_slo_violation_detection_engine(DetectorSubcomponent associated_detector_subcomponent) {
|
||||
while (true) {
|
||||
if (first_run){
|
||||
//Creation of threads that should always run and are independent of the monitored application.
|
||||
//1. Creation of the metric list input subscriber thread, which listens for the metrics to be considered
|
||||
//2. Creation of the slo rule input subscriber thread, which listens for new slo rules to be considered
|
||||
//3. Creation of the lost device subscriber thread, which listens for a new event signalling a lost edge device
|
||||
|
||||
//Metric list subscription thread
|
||||
BrokerSubscriber metric_list_subscriber = new BrokerSubscriber(metric_list_topic, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password"), amq_library_configuration_location);
|
||||
Runnable metric_list_topic_subscriber_runnable = () -> {
|
||||
boolean did_not_finish_execution_gracefully = true;
|
||||
while (did_not_finish_execution_gracefully) {
|
||||
int exit_status = metric_list_subscriber.subscribe(associated_detector_subcomponent.metric_list_subscriber_function, associated_detector_subcomponent.stop_signal); //This subscriber should not be immune to stop signals
|
||||
if (exit_status!=0) {
|
||||
Logger.getGlobal().log(info_logging_level,"Broker unavailable, will try to reconnect after 10 seconds");
|
||||
try {
|
||||
Thread.sleep(10000);
|
||||
} catch (InterruptedException i) {
|
||||
Logger.getGlobal().log(info_logging_level, "Sleep was interrupted, will immediately try to connect to the broker");
|
||||
}
|
||||
}else{
|
||||
did_not_finish_execution_gracefully = false;
|
||||
}
|
||||
}
|
||||
associated_detector_subcomponent.getSubcomponent_state().slo_bound_running_threads.remove(Thread.currentThread().getName().split(NAME_SEPARATOR)[0]);
|
||||
};
|
||||
CharacterizedThread.create_new_thread(metric_list_topic_subscriber_runnable,"metric_list_topic_subscriber_thread",true,associated_detector_subcomponent, CharacterizedThread.CharacterizedThreadType.slo_bound_running_thread);
|
||||
|
||||
|
||||
|
||||
//SLO rule subscription thread
|
||||
BrokerSubscriber slo_rule_topic_subscriber = new BrokerSubscriber(slo_rules_topic, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password"), amq_library_configuration_location);
|
||||
Runnable slo_rules_topic_subscriber_runnable = () -> {
|
||||
boolean did_not_finish_execution_gracefully = true;
|
||||
while (did_not_finish_execution_gracefully) {
|
||||
int exit_status = slo_rule_topic_subscriber.subscribe(associated_detector_subcomponent.slo_rule_topic_subscriber_function, associated_detector_subcomponent.stop_signal); //This subscriber should not be immune to stop signals
|
||||
if (exit_status!=0) {
|
||||
Logger.getGlobal().log(info_logging_level, "Broker unavailable, will try to reconnect after 10 seconds");
|
||||
try {
|
||||
Thread.sleep(10000);
|
||||
} catch (InterruptedException i) {
|
||||
Logger.getGlobal().log(info_logging_level, "Sleep was interrupted, will immediately try to connect to the broker");
|
||||
}
|
||||
}else{
|
||||
did_not_finish_execution_gracefully = false;
|
||||
}
|
||||
}
|
||||
associated_detector_subcomponent.getSubcomponent_state().slo_bound_running_threads.remove(Thread.currentThread().getName().split(NAME_SEPARATOR)[0]);
|
||||
};
|
||||
CharacterizedThread.create_new_thread(slo_rules_topic_subscriber_runnable,"slo_rules_topic_subscriber_thread",true,associated_detector_subcomponent, CharacterizedThread.CharacterizedThreadType.persistent_running_detector_thread);
|
||||
|
||||
|
||||
//Implementation of 'Lost edge device' thread
|
||||
|
||||
Runnable device_lost_topic_subscriber_runnable = () -> {
|
||||
boolean did_not_finish_execution_gracefully = true;
|
||||
while (did_not_finish_execution_gracefully) {
|
||||
int exit_status = device_lost_subscriber.subscribe(device_lost_subscriber_function, associated_detector_subcomponent.stop_signal); //This subscriber should not be immune to stop signals, else there would be new AtomicBoolean(false)
|
||||
if (exit_status!=0) {
|
||||
Logger.getGlobal().log(info_logging_level, "A device used by the platform was lost, will therefore trigger a reconfiguration");
|
||||
try {
|
||||
Thread.sleep(10000);
|
||||
} catch (InterruptedException i) {
|
||||
Logger.getGlobal().log(info_logging_level, "Sleep was interrupted, will immediately try to connect to the broker");
|
||||
}
|
||||
}else{
|
||||
did_not_finish_execution_gracefully = false;
|
||||
}
|
||||
}
|
||||
associated_detector_subcomponent.getSubcomponent_state().slo_bound_running_threads.remove(Thread.currentThread().getName().split(NAME_SEPARATOR)[0]);
|
||||
};
|
||||
|
||||
CharacterizedThread.create_new_thread(device_lost_topic_subscriber_runnable,"device_lost_topic_subscriber_thread",true,associated_detector_subcomponent, CharacterizedThread.CharacterizedThreadType.slo_bound_running_thread);
|
||||
|
||||
|
||||
if (self_publish_rule_file) {
|
||||
String json_file_name = prop.getProperty("input_file");
|
||||
String rules_json_string = null;
|
||||
try {
|
||||
rules_json_string = String.join(EMPTY, Files.readAllLines(Paths.get(new File(json_file_name).getAbsolutePath())));
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
BrokerPublisher publisher = new BrokerPublisher(slo_rules_topic, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password"), amq_library_configuration_location);
|
||||
publisher.publish(rules_json_string);
|
||||
Logger.getGlobal().log(info_logging_level, "Sent message\n" + rules_json_string);
|
||||
}
|
||||
}
|
||||
first_run = false;
|
||||
Logger.getGlobal().log(info_logging_level,"Initializing new slo violation detection engine for "+associated_detector_subcomponent.get_name());
|
||||
synchronized (associated_detector_subcomponent.can_modify_slo_rules) {
|
||||
do {
|
||||
while((!associated_detector_subcomponent.can_modify_slo_rules.getValue()) || (!associated_detector_subcomponent.slo_rule_arrived.get())){
|
||||
try {
|
||||
associated_detector_subcomponent.can_modify_slo_rules.wait();
|
||||
}catch (InterruptedException i){
|
||||
i.printStackTrace();
|
||||
}
|
||||
}while((!associated_detector_subcomponent.can_modify_slo_rules.getValue()) || (!associated_detector_subcomponent.slo_rule_arrived.get()));
|
||||
}
|
||||
associated_detector_subcomponent.can_modify_slo_rules.setValue(false);
|
||||
associated_detector_subcomponent.slo_rule_arrived.set(false);
|
||||
String rule_representation = MESSAGE_CONTENTS.get_synchronized_contents(slo_rules_topic);
|
||||
String rule_representation = MESSAGE_CONTENTS.get_synchronized_contents(associated_detector_subcomponent.get_application_name(),slo_rules_topic);
|
||||
if (slo_rule_arrived_has_updated_version(rule_representation)) {
|
||||
if (single_slo_rule_active) {
|
||||
associated_detector_subcomponent.getSubcomponent_state().slo_rules.clear();
|
||||
@ -378,8 +292,8 @@ public class DetectorSubcomponentUtilities {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
stop_all_running_threads(associated_detector_subcomponent);
|
||||
Logger.getGlobal().log(info_logging_level,"Initializing debug instance for detector component "+associated_detector_subcomponent.get_name());
|
||||
DebugDataSubscription.initiate(prop.getProperty("broker_ip_url"),prop.getProperty("broker_username"), prop.getProperty("broker_password"),associated_detector_subcomponent);
|
||||
initialize_monitoring_datastructures_with_empty_data(associated_detector_subcomponent.getSubcomponent_state().slo_rules);
|
||||
//
|
||||
|
@ -1,11 +1,28 @@
|
||||
package slo_violation_detector_engine.director;
|
||||
|
||||
import eu.nebulouscloud.exn.Connector;
|
||||
import org.json.simple.JSONArray;
|
||||
import org.json.simple.JSONObject;
|
||||
import org.json.simple.parser.JSONParser;
|
||||
import slo_violation_detector_engine.detector.DetectorSubcomponent;
|
||||
import slo_violation_detector_engine.generic.SLOViolationDetectorSubcomponent;
|
||||
import utility_beans.CharacterizedThread;
|
||||
import utility_beans.*;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
import java.text.NumberFormat;
|
||||
import java.time.Clock;
|
||||
import java.util.HashMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import static configuration.Constants.*;
|
||||
import static slo_violation_detector_engine.detector.DetectorSubcomponent.detector_subcomponents;
|
||||
import static slo_violation_detector_engine.detector.DetectorSubcomponent.get_associated_detector;
|
||||
import static slo_violation_detector_engine.generic.ComponentState.*;
|
||||
import static utilities.OperationalModeUtils.get_director_publishing_topics;
|
||||
import static utilities.OperationalModeUtils.get_director_subscription_topics;
|
||||
|
||||
@ -15,6 +32,11 @@ public class DirectorSubcomponent extends SLOViolationDetectorSubcomponent {
|
||||
private Integer id = 1;
|
||||
public static HashMap<String,DirectorSubcomponent> director_subcomponents = new HashMap<>();
|
||||
private static DirectorSubcomponent master_director;
|
||||
public static boolean first_run = true;
|
||||
public final AtomicBoolean stop_signal = new AtomicBoolean(false);
|
||||
public static SynchronizedStringMap MESSAGE_CONTENTS = new SynchronizedStringMap();
|
||||
public final SynchronizedBoolean can_modify_monitoring_metrics = new SynchronizedBoolean(false);
|
||||
|
||||
private String director_name;
|
||||
|
||||
public static DirectorSubcomponent getMaster_director() {
|
||||
@ -35,6 +57,115 @@ public class DirectorSubcomponent extends SLOViolationDetectorSubcomponent {
|
||||
}
|
||||
|
||||
private void create_director_topic_subscribers(){
|
||||
if (first_run){
|
||||
//Creation of threads that should always run and are independent of the monitored application.
|
||||
//1. Creation of the metric list input subscriber thread, which listens for the metrics to be considered
|
||||
//2. Creation of the slo rule input subscriber thread, which listens for new slo rules to be considered
|
||||
//3. Creation of the lost device subscriber thread, which listens for a new event signalling a lost edge device
|
||||
|
||||
//Metric list subscription thread
|
||||
BrokerSubscriber metric_list_subscriber = new BrokerSubscriber(metric_list_topic, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password"), amq_library_configuration_location,nebulous_components_application);
|
||||
Runnable metric_list_topic_subscriber_runnable = () -> {
|
||||
boolean did_not_finish_execution_gracefully = true;
|
||||
while (did_not_finish_execution_gracefully) {
|
||||
int exit_status = metric_list_subscriber.subscribe(metric_list_subscriber_function, this.stop_signal); //This subscriber should not be immune to stop signals
|
||||
if (exit_status!=0) {
|
||||
Logger.getGlobal().log(info_logging_level,"Broker unavailable, will try to reconnect after 10 seconds");
|
||||
try {
|
||||
Thread.sleep(10000);
|
||||
} catch (InterruptedException i) {
|
||||
Logger.getGlobal().log(info_logging_level, "Sleep was interrupted, will immediately try to connect to the broker");
|
||||
}
|
||||
}else{
|
||||
did_not_finish_execution_gracefully = false;
|
||||
}
|
||||
}
|
||||
persistent_running_director_threads.remove(Thread.currentThread().getName().split(NAME_SEPARATOR)[0]);
|
||||
};
|
||||
CharacterizedThread.create_new_thread(metric_list_topic_subscriber_runnable,"metric_list_topic_subscriber_thread",true,this, CharacterizedThread.CharacterizedThreadType.persistent_running_director_thread);
|
||||
|
||||
|
||||
|
||||
//SLO rule subscription thread
|
||||
BrokerSubscriber slo_rule_topic_subscriber = new BrokerSubscriber(slo_rules_topic, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password"), amq_library_configuration_location,nebulous_components_application);
|
||||
Runnable slo_rules_topic_subscriber_runnable = () -> {
|
||||
boolean did_not_finish_execution_gracefully = true;
|
||||
while (did_not_finish_execution_gracefully) {
|
||||
int exit_status = slo_rule_topic_subscriber.subscribe(slo_rule_topic_subscriber_function, stop_signal); //This subscriber should not be immune to stop signals
|
||||
if (exit_status!=0) {
|
||||
Logger.getGlobal().log(info_logging_level, "Broker unavailable, will try to reconnect after 10 seconds");
|
||||
try {
|
||||
Thread.sleep(10000);
|
||||
} catch (InterruptedException i) {
|
||||
Logger.getGlobal().log(info_logging_level, "Sleep was interrupted, will immediately try to connect to the broker");
|
||||
}
|
||||
}else{
|
||||
did_not_finish_execution_gracefully = false;
|
||||
}
|
||||
}
|
||||
persistent_running_director_threads.remove(Thread.currentThread().getName().split(NAME_SEPARATOR)[0]);
|
||||
};
|
||||
CharacterizedThread.create_new_thread(slo_rules_topic_subscriber_runnable,"slo_rules_topic_subscriber_thread",true,this, CharacterizedThread.CharacterizedThreadType.persistent_running_director_thread);
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
BrokerSubscriber device_lost_subscriber = new BrokerSubscriber(topic_for_lost_device_announcement, broker_ip, broker_username, broker_password, amq_library_configuration_location,nebulous_components_application);
|
||||
BiFunction<BrokerSubscriptionDetails, String, String> device_lost_subscriber_function = (broker_details, message) -> {
|
||||
BrokerPublisher persistent_publisher = new BrokerPublisher(topic_for_severity_announcement, broker_ip, broker_username, broker_password, amq_library_configuration_location);
|
||||
|
||||
Clock clock = Clock.systemUTC();
|
||||
Long current_time_seconds = (long) Math.floor(clock.millis()/1000.0);
|
||||
JSONObject severity_json = new JSONObject();
|
||||
severity_json.put("severity", 100);
|
||||
severity_json.put("probability", 100);
|
||||
severity_json.put("predictionTime", current_time_seconds);
|
||||
persistent_publisher.publish(severity_json.toJSONString());
|
||||
|
||||
return topic_for_lost_device_announcement + ":MSG:" + message;
|
||||
};
|
||||
|
||||
|
||||
//Implementation of 'Lost edge device' thread
|
||||
|
||||
Runnable device_lost_topic_subscriber_runnable = () -> {
|
||||
boolean did_not_finish_execution_gracefully = true;
|
||||
while (did_not_finish_execution_gracefully) {
|
||||
int exit_status = device_lost_subscriber.subscribe(device_lost_subscriber_function, stop_signal); //This subscriber should not be immune to stop signals, else there would be new AtomicBoolean(false)
|
||||
if (exit_status!=0) {
|
||||
Logger.getGlobal().log(info_logging_level, "A device used by the platform was lost, will therefore trigger a reconfiguration");
|
||||
try {
|
||||
Thread.sleep(10000);
|
||||
} catch (InterruptedException i) {
|
||||
Logger.getGlobal().log(info_logging_level, "Sleep was interrupted, will immediately try to connect to the broker");
|
||||
}
|
||||
}else{
|
||||
did_not_finish_execution_gracefully = false;
|
||||
}
|
||||
}
|
||||
persistent_running_director_threads.remove(Thread.currentThread().getName().split(NAME_SEPARATOR)[0]);
|
||||
};
|
||||
|
||||
CharacterizedThread.create_new_thread(device_lost_topic_subscriber_runnable,"device_lost_topic_subscriber_thread",true,this, CharacterizedThread.CharacterizedThreadType.persistent_running_director_thread);
|
||||
|
||||
|
||||
if (self_publish_rule_file) {
|
||||
String json_file_name = prop.getProperty("input_file");
|
||||
String rules_json_string = null;
|
||||
try {
|
||||
rules_json_string = String.join(EMPTY, Files.readAllLines(Paths.get(new File(json_file_name).getAbsolutePath())));
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
BrokerPublisher publisher = new BrokerPublisher(slo_rules_topic, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password"), amq_library_configuration_location);
|
||||
publisher.publish(rules_json_string);
|
||||
Logger.getGlobal().log(info_logging_level, "Sent message\n" + rules_json_string);
|
||||
}
|
||||
}
|
||||
first_run = false;
|
||||
|
||||
|
||||
for (String subscription_topic : get_director_subscription_topics()){
|
||||
//TODO subscribe to each topic, creating a Characterized thread for each of them
|
||||
|
||||
@ -45,6 +176,93 @@ public class DirectorSubcomponent extends SLOViolationDetectorSubcomponent {
|
||||
//subscribing_connector = new Connector("slovid_director",)
|
||||
}
|
||||
|
||||
public BiFunction<BrokerSubscriptionDetails, String, String> metric_list_subscriber_function = (broker_details, message) -> {
|
||||
DetectorSubcomponent associated_detector = get_associated_detector(broker_details.getApplication_name());
|
||||
synchronized (can_modify_monitoring_metrics) {
|
||||
can_modify_monitoring_metrics.setValue(true);
|
||||
MESSAGE_CONTENTS.assign_value(broker_details.getApplication_name(),metric_list_topic, message);
|
||||
String metric_name;
|
||||
double lower_bound,upper_bound;
|
||||
JSONParser parser = new JSONParser();
|
||||
JSONObject metric_list_object;
|
||||
try {
|
||||
metric_list_object = (JSONObject) parser.parse(message);
|
||||
for (Object element : (JSONArray) metric_list_object.get("metric_list")){
|
||||
metric_name = (String)((JSONObject)element).get("name");
|
||||
String lower_bound_str = (String)((JSONObject)element).get("lower_bound");
|
||||
String upper_bound_str = (String)((JSONObject)element).get("upper_bound");
|
||||
NumberFormat numberFormat = NumberFormat.getInstance();
|
||||
Number lower_bound_number, upper_bound_number;
|
||||
boolean is_lower_bound_integer=false,is_lower_bound_double=false;
|
||||
boolean is_upper_bound_integer = false,is_upper_bound_double=false;
|
||||
if (!(lower_bound_str.equalsIgnoreCase("-inf") || lower_bound_str.equalsIgnoreCase("-infinity"))){
|
||||
lower_bound_number = numberFormat.parse(lower_bound_str);
|
||||
if (lower_bound_number instanceof Integer){
|
||||
is_lower_bound_integer = true;
|
||||
is_lower_bound_double = false;
|
||||
}else if (lower_bound_number instanceof Double){
|
||||
is_lower_bound_double = true;
|
||||
is_lower_bound_integer = false;
|
||||
}
|
||||
lower_bound = lower_bound_number.doubleValue();
|
||||
}else{
|
||||
lower_bound = Double.NEGATIVE_INFINITY;
|
||||
}
|
||||
|
||||
if (!(upper_bound_str.equalsIgnoreCase("inf") || upper_bound_str.equalsIgnoreCase("infinity"))){
|
||||
upper_bound_number = numberFormat.parse(upper_bound_str);
|
||||
if (upper_bound_number instanceof Integer){
|
||||
is_upper_bound_integer = true;
|
||||
is_upper_bound_double = false;
|
||||
}else if (upper_bound_number instanceof Double){
|
||||
is_upper_bound_double = true;
|
||||
is_upper_bound_integer = false;
|
||||
}
|
||||
upper_bound = upper_bound_number.doubleValue();
|
||||
}else{
|
||||
upper_bound = Double.POSITIVE_INFINITY;
|
||||
}
|
||||
RealtimeMonitoringAttribute.AttributeValuesType attribute_type;
|
||||
if (is_upper_bound_integer && is_lower_bound_integer){
|
||||
attribute_type = RealtimeMonitoringAttribute.AttributeValuesType.Integer;
|
||||
}else if (is_lower_bound_double || is_upper_bound_double){
|
||||
attribute_type = RealtimeMonitoringAttribute.AttributeValuesType.Double;
|
||||
}else{
|
||||
attribute_type = RealtimeMonitoringAttribute.AttributeValuesType.Unknown;
|
||||
}
|
||||
|
||||
associated_detector.getSubcomponent_state().getMonitoring_attributes().put(metric_name,new RealtimeMonitoringAttribute(metric_name,lower_bound,upper_bound,attribute_type));
|
||||
}
|
||||
}catch (Exception e){
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
//slo_rule_arrived.set(true);
|
||||
can_modify_monitoring_metrics.notifyAll();
|
||||
|
||||
Logger.getGlobal().log(info_logging_level, "BrokerClientApp: - Received text message: " + message + " at topic " + metric_list_topic);
|
||||
|
||||
}
|
||||
return "Monitoring metrics message processed";
|
||||
};
|
||||
|
||||
public static BiFunction<BrokerSubscriptionDetails, String, String> slo_rule_topic_subscriber_function = (broker_details, message) -> {
|
||||
//DetectorSubcomponent new_detector = new DetectorSubcomponent(application, CharacterizedThread.CharacterizedThreadRunMode.detached);
|
||||
String application = broker_details.getApplication_name();
|
||||
DetectorSubcomponent new_detector = get_associated_detector(application);
|
||||
detector_subcomponents.put(application,new_detector);
|
||||
synchronized (new_detector.can_modify_slo_rules) {
|
||||
new_detector.can_modify_slo_rules.setValue(true);
|
||||
MESSAGE_CONTENTS.assign_value(application,slo_rules_topic, message);
|
||||
new_detector.slo_rule_arrived.set(true);
|
||||
new_detector.can_modify_slo_rules.notifyAll();
|
||||
|
||||
Logger.getGlobal().log(info_logging_level, "BrokerClientApp: - Received text message: " + message + " at topic " + slo_rules_topic);
|
||||
|
||||
}
|
||||
return slo_rules_topic + ":MSG:" + message;
|
||||
};
|
||||
|
||||
@Override
|
||||
public String get_name() {
|
||||
return director_name;
|
||||
|
@ -0,0 +1,11 @@
|
||||
package slo_violation_detector_engine.generic;
|
||||
|
||||
import java.util.Properties;
|
||||
|
||||
public class ComponentState {
|
||||
|
||||
public static Properties prop = new Properties();
|
||||
public static String broker_ip ="localhost";
|
||||
public static String broker_username= "admin";
|
||||
public static String broker_password= "admin";
|
||||
}
|
@ -7,10 +7,12 @@ import utility_beans.BrokerPublisher;
|
||||
import org.json.simple.JSONObject;
|
||||
import slo_rule_modelling.SLORule;
|
||||
import utility_beans.BrokerSubscriber;
|
||||
import utility_beans.BrokerSubscriptionDetails;
|
||||
import utility_beans.CharacterizedThread;
|
||||
|
||||
import java.sql.Timestamp;
|
||||
import java.time.Clock;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.logging.Logger;
|
||||
@ -18,24 +20,27 @@ import java.util.logging.Logger;
|
||||
import static configuration.Constants.*;
|
||||
import static java.lang.Thread.sleep;
|
||||
import static slo_rule_modelling.SLORule.process_rule_value;
|
||||
import static slo_violation_detector_engine.generic.SLOViolationDetectorStateUtils.*;
|
||||
import static slo_violation_detector_engine.generic.ComponentState.*;
|
||||
import static slo_violation_detector_engine.detector.DetectorSubcomponentUtilities.*;
|
||||
import static utilities.DebugDataSubscription.*;
|
||||
|
||||
public class Runnables {
|
||||
|
||||
public static class DebugDataRunnable implements Runnable{
|
||||
DetectorSubcomponent detector;
|
||||
private DetectorSubcomponent detector; //TODO Verify whether or not we need this message per detector or on a detector-independent, all-application way
|
||||
public DebugDataRunnable(DetectorSubcomponent detector){
|
||||
this.detector = detector;
|
||||
}
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
Logger.getGlobal().log(info_logging_level,"Starting to subscribe to debug output trigger");
|
||||
synchronized (detector.HAS_MESSAGE_ARRIVED.get_synchronized_boolean(debug_data_trigger_topic_name)) {
|
||||
//if (Main.HAS_MESSAGE_ARRIVED.get_synchronized_boolean(debug_data_topic_name).getValue())
|
||||
debug_data_subscriber = new BrokerSubscriber(debug_data_trigger_topic_name, detector.getBrokerSubscriptionDetails().getBroker_ip(),detector.getBrokerSubscriptionDetails().getBroker_username(),detector.getBrokerSubscriptionDetails().getBroker_password(), amq_library_configuration_location);
|
||||
BrokerSubscriptionDetails broker_details = detector.getBrokerSubscriptionDetails(debug_data_trigger_topic_name);
|
||||
debug_data_subscriber = new BrokerSubscriber(debug_data_trigger_topic_name, broker_details.getBroker_ip(),broker_details.getBroker_username(),broker_details.getBroker_password(), amq_library_configuration_location,detector.get_application_name());
|
||||
debug_data_subscriber.subscribe(debug_data_generation, detector.stop_signal);
|
||||
Logger.getGlobal().log(info_logging_level,"Debug data subscriber initiated");
|
||||
}
|
||||
if (Thread.interrupted()) {
|
||||
throw new InterruptedException();
|
||||
@ -67,7 +72,7 @@ public class Runnables {
|
||||
public static Runnable get_severity_calculation_runnable(SLORule rule, DetectorSubcomponent detector) {
|
||||
|
||||
Runnable severity_calculation_runnable = () -> {
|
||||
BrokerPublisher persistent_publisher = new BrokerPublisher(topic_for_severity_announcement, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password"), amq_library_configuration_location);
|
||||
BrokerPublisher persistent_publisher = new BrokerPublisher(topic_for_severity_announcement, broker_ip,broker_username,broker_password, amq_library_configuration_location);
|
||||
|
||||
while (!detector.stop_signal.get()) {
|
||||
synchronized (detector.PREDICTION_EXISTS) {
|
||||
@ -137,7 +142,7 @@ public class Runnables {
|
||||
severity_json.put("severity", rule_severity);
|
||||
severity_json.put("probability", slo_violation_probability);
|
||||
severity_json.put("predictionTime", targeted_prediction_time);
|
||||
persistent_publisher.publish(severity_json.toJSONString());
|
||||
persistent_publisher.publish(severity_json.toJSONString(), Collections.singleton(detector.get_application_name()));
|
||||
}
|
||||
|
||||
detector.getSubcomponent_state().slo_violation_event_recording_queue.add(System.currentTimeMillis());
|
||||
|
@ -16,9 +16,7 @@ public class SLOViolationDetectorStateUtils {
|
||||
private static String self_starting_command_string = "java -jar SLOSeverityCalculator-4.0-SNAPSHOT.jar > $LOG_FILE 2>&1";
|
||||
public static OperationalMode operational_mode;
|
||||
public final SynchronizedInteger create_new_slo_detector = new SynchronizedInteger(0);
|
||||
public static SynchronizedStringMap MESSAGE_CONTENTS = new SynchronizedStringMap();
|
||||
|
||||
public static Properties prop = new Properties();
|
||||
|
||||
|
||||
public static InputStream getPreferencesFileInputStream(String custom_properties_file_path) throws IOException {
|
||||
|
@ -16,7 +16,6 @@ import java.util.logging.Logger;
|
||||
import static configuration.Constants.*;
|
||||
import static slo_violation_detector_engine.detector.DetectorSubcomponent.detector_subcomponents;
|
||||
import static slo_violation_detector_engine.director.DirectorSubcomponent.director_subcomponents;
|
||||
import static utility_beans.RealtimeMonitoringAttribute.get_metric_value;
|
||||
|
||||
/**
|
||||
* The objective of this class is to allow a structured synopsis of the current state of the SLO Violation Detector to be created, as a response to a request sent to it through an appropriate topic.
|
||||
@ -25,7 +24,6 @@ public class DebugDataSubscription {
|
||||
|
||||
public static String debug_data_trigger_topic_name = "sloviolationdetector.debug";
|
||||
public static String debug_data_output_topic_name = "sloviolationdetector.debug_output";
|
||||
private static String broker_username,broker_password,broker_ip_address;
|
||||
public static BiFunction <BrokerSubscriptionDetails,String,String> debug_data_generation = (broker_subscription_details, message) ->{
|
||||
|
||||
String output_debug_data = "";
|
||||
@ -93,8 +91,8 @@ public class DebugDataSubscription {
|
||||
intermediate_debug_string.append("\t- Metric name: ").append(entry.getKey());
|
||||
}
|
||||
|
||||
Double metric_value = get_metric_value(detector,entry.getKey());
|
||||
CircularFifoQueue<Double> metric_values = entry.getValue().getActual_metric_values();
|
||||
Double metric_value = detector.get_metric_value(entry.getKey());
|
||||
CircularFifoQueue<Number> metric_values = entry.getValue().getActual_metric_values();
|
||||
if (metric_value.isNaN()) {
|
||||
intermediate_debug_string.append(" - value was determined as NaN, individual collected values are ").append(metric_values).append("\n");
|
||||
} else if (metric_value.isInfinite()) {
|
||||
|
@ -12,11 +12,16 @@ package utilities;
|
||||
import java.util.List;
|
||||
|
||||
public class MathUtils {
|
||||
public static double get_average(Iterable<Double> values){
|
||||
|
||||
public static double get_average(Iterable<Number> values){
|
||||
double sum = 0;
|
||||
int counter = 0;
|
||||
for (Double value : values){
|
||||
sum = sum+value;
|
||||
for (Number value :values){
|
||||
if (value instanceof Double){
|
||||
sum = sum+(Double)value;
|
||||
} else if (value instanceof Integer) {
|
||||
sum = sum+(Integer)value;
|
||||
}
|
||||
counter++;
|
||||
}
|
||||
return (sum/counter);
|
||||
|
@ -48,7 +48,7 @@ public class MonitoringAttributeUtilities {
|
||||
detector_state.getMonitoring_attributes().remove(monitoring_metric_name);
|
||||
}
|
||||
|
||||
detector_state.getMonitoring_attributes().put(monitoring_metric_name, new RealtimeMonitoringAttribute(monitoring_metric_name));
|
||||
detector_state.getMonitoring_attributes().put(monitoring_metric_name, new RealtimeMonitoringAttribute(monitoring_metric_name,false, RealtimeMonitoringAttribute.AttributeValuesType.Unknown));
|
||||
|
||||
detector_state.getMonitoring_attributes_roc_statistics().put(monitoring_metric_name,new MonitoringAttributeStatistics()); //The rate of change of a metric, is a metric which itself should be monitored for its upper bound
|
||||
|
||||
|
@ -8,6 +8,7 @@ import org.json.simple.JSONObject;
|
||||
import org.json.simple.parser.JSONParser;
|
||||
import org.json.simple.parser.ParseException;
|
||||
|
||||
import java.lang.reflect.Array;
|
||||
import java.util.*;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
@ -15,6 +16,7 @@ import java.util.logging.Logger;
|
||||
import static configuration.Constants.*;
|
||||
import static java.lang.Thread.sleep;
|
||||
import static java.util.logging.Level.INFO;
|
||||
import static slo_violation_detector_engine.detector.DetectorSubcomponent.detector_subcomponents;
|
||||
import static utilities.DebugDataSubscription.debug_data_output_topic_name;
|
||||
|
||||
public class BrokerPublisher {
|
||||
@ -89,19 +91,26 @@ public class BrokerPublisher {
|
||||
}
|
||||
}
|
||||
|
||||
//TODO This assumes that the only content to be sent is json-like
|
||||
public void publish(String json_string_content) {
|
||||
JSONParser parser = new JSONParser();
|
||||
JSONObject json_object = new JSONObject();
|
||||
try{
|
||||
json_object = (JSONObject) parser.parse(json_string_content);
|
||||
}catch (ParseException p){
|
||||
Logger.getGlobal().log(Level.WARNING,"Could not parse the string content to be published to the broker as json");
|
||||
}
|
||||
if (private_publisher_instance!=null) {
|
||||
private_publisher_instance.send(json_object);
|
||||
}else{
|
||||
Logger.getGlobal().log(Level.SEVERE,"Could not send message to AMQP broker, as the broker ip to be used has not been specified");
|
||||
//TODO The methods below assume that the only content to be sent is json-like
|
||||
public void publish (String json_string_content, Iterable<String> application_names){
|
||||
|
||||
for (String application_name : application_names) {
|
||||
JSONParser parser = new JSONParser();
|
||||
JSONObject json_object = new JSONObject();
|
||||
try {
|
||||
json_object = (JSONObject) parser.parse(json_string_content);
|
||||
} catch (ParseException p) {
|
||||
Logger.getGlobal().log(Level.WARNING, "Could not parse the string content to be published to the broker as json");
|
||||
}
|
||||
if (private_publisher_instance != null) {
|
||||
private_publisher_instance.send(json_object);
|
||||
} else {
|
||||
Logger.getGlobal().log(Level.SEVERE, "Could not send message to AMQP broker, as the broker ip to be used has not been specified");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void publish(String json_string_content) {
|
||||
publish(json_string_content,detector_subcomponents.keySet());
|
||||
}
|
||||
}
|
||||
|
@ -18,21 +18,25 @@ import static java.util.logging.Level.INFO;
|
||||
|
||||
public class BrokerSubscriber {
|
||||
|
||||
private static class MessageProcessingHandler extends Handler{
|
||||
private class MessageProcessingHandler extends Handler{
|
||||
private BrokerSubscriptionDetails broker_details;
|
||||
private static final BiFunction temporary_function = (Object o, Object o2) -> {
|
||||
//System.out.println("");
|
||||
Logger.getGlobal().log(INFO,"REPLACE_TEMPORARY_HANDLING_FUNCTIONALITY");
|
||||
return "IN_PROCESSING";
|
||||
};
|
||||
private BiFunction<String,String,String> processing_function;
|
||||
private BiFunction<BrokerSubscriptionDetails,String,String> processing_function;
|
||||
@Override
|
||||
public void onMessage(String key, String address, Map body, Message message, Context context) {
|
||||
processing_function.apply(address, JSONValue.toJSONString(body));
|
||||
Logger.getGlobal().log(info_logging_level,"Handling message for address "+address);
|
||||
processing_function.apply(broker_details, JSONValue.toJSONString(body));
|
||||
}
|
||||
public MessageProcessingHandler(){
|
||||
public MessageProcessingHandler(BrokerSubscriptionDetails broker_details){
|
||||
this.broker_details = broker_details;
|
||||
this.processing_function = temporary_function;
|
||||
}
|
||||
public MessageProcessingHandler(BiFunction biFunction){
|
||||
public MessageProcessingHandler(BiFunction<BrokerSubscriptionDetails,String,String> biFunction, BrokerSubscriptionDetails broker_details){
|
||||
this.broker_details = broker_details;
|
||||
this.processing_function = biFunction;
|
||||
}
|
||||
public BiFunction getProcessing_function() {
|
||||
@ -49,7 +53,8 @@ public class BrokerSubscriber {
|
||||
private String broker_ip;
|
||||
private String brokerUsername;
|
||||
private String brokerPassword;
|
||||
public BrokerSubscriber(String topic, String broker_ip, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation){
|
||||
BrokerSubscriptionDetails broker_details;
|
||||
public BrokerSubscriber(String topic, String broker_ip, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation,String application_name){
|
||||
boolean able_to_initialize_BrokerSubscriber = topic!=null && broker_ip!=null && brokerUsername!=null && brokerPassword!=null && !topic.equals(EMPTY) && !broker_ip.equals(EMPTY) && !brokerUsername.equals(EMPTY) && !brokerPassword.equals(EMPTY);
|
||||
|
||||
if (!able_to_initialize_BrokerSubscriber){
|
||||
@ -62,6 +67,7 @@ public class BrokerSubscriber {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
broker_details = new BrokerSubscriptionDetails(broker_ip,brokerUsername,brokerPassword,application_name,topic);
|
||||
boolean subscriber_configuration_changed;
|
||||
if (!broker_and_topics_to_subscribe_to.containsKey(broker_ip)){
|
||||
HashSet<String> topics_to_subscribe_to = new HashSet<>();
|
||||
@ -84,7 +90,7 @@ public class BrokerSubscriber {
|
||||
}
|
||||
}
|
||||
if (subscriber_configuration_changed){
|
||||
Consumer current_consumer = new Consumer(topic, topic, new MessageProcessingHandler(),true,true);
|
||||
Consumer current_consumer = new Consumer(topic, topic, new MessageProcessingHandler(broker_details),true,true);
|
||||
active_consumers_per_topic_per_broker_ip.get(broker_ip).put(topic,current_consumer);
|
||||
|
||||
this.topic = topic;
|
||||
@ -142,7 +148,7 @@ public class BrokerSubscriber {
|
||||
active_consumers_per_topic_per_broker_ip.put(broker_ip,new HashMap<>());
|
||||
}
|
||||
//Then add the new consumer
|
||||
Consumer new_consumer = new Consumer(topic,topic,new MessageProcessingHandler(function),true,true);
|
||||
Consumer new_consumer = new Consumer(topic,topic,new MessageProcessingHandler(function,broker_details),true,true);
|
||||
new_consumer.setProperty("topic",topic);
|
||||
active_consumers_per_topic_per_broker_ip.get(broker_ip).put(topic,new_consumer);
|
||||
add_topic_consumer_to_broker_connector(new_consumer);
|
||||
@ -175,11 +181,11 @@ public class BrokerSubscriber {
|
||||
|
||||
public static class TopicNames{
|
||||
public static String realtime_metric_values_topic(String metric) {
|
||||
return realtime_metrics_topic+metric;
|
||||
return topic_prefix_realtime_metrics +metric;
|
||||
}
|
||||
|
||||
public static String final_metric_predictions_topic(String metric) {
|
||||
return final_metric_prediction_topic+metric;
|
||||
return topic_prefix_final_predicted_metrics +metric;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,16 +1,21 @@
|
||||
package utility_beans;
|
||||
|
||||
import static configuration.Constants.EMPTY;
|
||||
import static configuration.Constants.default_application_name;
|
||||
|
||||
public class BrokerSubscriptionDetails {
|
||||
String broker_username = "admin";
|
||||
String broker_password = "admin";
|
||||
String broker_ip = "localhost";
|
||||
String application_name = default_application_name;
|
||||
String topic = EMPTY;
|
||||
|
||||
public BrokerSubscriptionDetails(String broker_ip, String broker_username, String broker_password) {
|
||||
public BrokerSubscriptionDetails(String broker_ip, String broker_username, String broker_password,String application_name, String topic) {
|
||||
this.broker_ip = broker_ip;
|
||||
this.broker_username = broker_username;
|
||||
this.broker_password = broker_password;
|
||||
this.topic = topic;
|
||||
this.application_name = application_name;
|
||||
}
|
||||
|
||||
public BrokerSubscriptionDetails(boolean fake_broker_subscription) {
|
||||
@ -18,6 +23,8 @@ public class BrokerSubscriptionDetails {
|
||||
this.broker_username = EMPTY;
|
||||
this.broker_password = EMPTY;
|
||||
this.broker_ip = EMPTY;
|
||||
this.topic = EMPTY;
|
||||
this.application_name = EMPTY;
|
||||
}
|
||||
}
|
||||
|
||||
@ -44,4 +51,20 @@ public class BrokerSubscriptionDetails {
|
||||
public void setBroker_ip(String broker_ip) {
|
||||
this.broker_ip = broker_ip;
|
||||
}
|
||||
|
||||
public String getApplication_name() {
|
||||
return application_name;
|
||||
}
|
||||
|
||||
public void setApplication_name(String application_name) {
|
||||
this.application_name = application_name;
|
||||
}
|
||||
|
||||
public String getTopic() {
|
||||
return topic;
|
||||
}
|
||||
|
||||
public void setTopic(String topic) {
|
||||
this.topic = topic;
|
||||
}
|
||||
}
|
||||
|
@ -15,13 +15,179 @@ import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import static configuration.Constants.*;
|
||||
import static utilities.DebugDataSubscription.debug_data_trigger_topic_name;
|
||||
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class CustomDataPublisher {
|
||||
private static final Map<String, String> presetTexts = new HashMap<>();
|
||||
|
||||
static {
|
||||
update_event_data();
|
||||
}
|
||||
|
||||
private static void update_event_data(){
|
||||
presetTexts.put("eu.nebulouscloud.monitoring.slo.new", "{\n" +
|
||||
" \"name\": \"_Application1\",\n" +
|
||||
" \"operator\": \"OR\",\n" +
|
||||
" \"constraints\": [\n" +
|
||||
" {\n" +
|
||||
" \"name\": \"cpu_and_memory_or_swap_too_high\",\n" +
|
||||
" \"operator\": \"AND\",\n" +
|
||||
" \"constraints\": [\n" +
|
||||
" {\n" +
|
||||
" \"name\": \"cpu_usage_high\",\n" +
|
||||
" \"metric\": \"cpu_usage\",\n" +
|
||||
" \"operator\": \">\",\n" +
|
||||
" \"threshold\": 80.0\n" +
|
||||
" },\n" +
|
||||
" {\n" +
|
||||
" \"name\": \"memory_or_swap_usage_high\",\n" +
|
||||
" \"operator\": \"OR\",\n" +
|
||||
" \"constraints\": [\n" +
|
||||
" {\n" +
|
||||
" \"name\": \"memory_usage_high\",\n" +
|
||||
" \"metric\": \"ram_usage\",\n" +
|
||||
" \"operator\": \">\",\n" +
|
||||
" \"threshold\": 70.0\n" +
|
||||
" },\n" +
|
||||
" {\n" +
|
||||
" \"name\": \"disk_usage_high\",\n" +
|
||||
" \"metric\": \"swap_usage\",\n" +
|
||||
" \"operator\": \">\",\n" +
|
||||
" \"threshold\": 50.0\n" +
|
||||
" }\n" +
|
||||
" ]\n" +
|
||||
" }\n" +
|
||||
" ]\n" +
|
||||
" }\n" +
|
||||
" ]\n" +
|
||||
"}");
|
||||
presetTexts.put("eu.nebulouscloud.monitoring.realtime.cpu_usage",
|
||||
"{\n" +
|
||||
" \"metricValue\": 12.34,\n" +
|
||||
" \"level\": 1,\n" +
|
||||
" \"component_id\":\"postgresql_1\",\n" +
|
||||
" \"timestamp\": "+(int)(System.currentTimeMillis()/1000)+"\n" +
|
||||
"}\n");
|
||||
presetTexts.put("eu.nebulouscloud.monitoring.predicted.cpu_usage", "{\n" +
|
||||
" \"metricValue\": 92.34,\n" +
|
||||
" \"level\": 1,\n" +
|
||||
" \"timestamp\": "+(int)(System.currentTimeMillis()/1000)+"\n" +
|
||||
" \"probability\": 0.98,\n" +
|
||||
" \"confidence_interval\" : [8,15]\n" +
|
||||
" \"predictionTime\": "+(int)(10+System.currentTimeMillis()/1000)+"\n" +
|
||||
"}");
|
||||
}
|
||||
private Publisher private_publisher_instance;
|
||||
private String topic;
|
||||
private String broker_ip;
|
||||
|
||||
public CustomDataPublisher(String broker_topic, String broker_ip, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation,String publisher_key) {
|
||||
|
||||
boolean publisher_configuration_changed;
|
||||
ArrayList<Publisher> publishers = new ArrayList<>();
|
||||
private_publisher_instance = new Publisher(slovid_publisher_key,broker_topic,true,true);
|
||||
publishers.add(private_publisher_instance);
|
||||
|
||||
|
||||
Connector connector = new Connector("slovid",
|
||||
new ConnectorHandler() {
|
||||
}, publishers
|
||||
, List.of(),
|
||||
false,
|
||||
false,
|
||||
new StaticExnConfig(
|
||||
broker_ip,
|
||||
5672,
|
||||
brokerUsername,
|
||||
brokerPassword,
|
||||
60,
|
||||
EMPTY
|
||||
)
|
||||
);
|
||||
connector.start();
|
||||
|
||||
}
|
||||
|
||||
|
||||
public CustomDataPublisher(String broker_topic, String broker_ip, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation) {
|
||||
this(broker_topic,broker_ip,brokerUsername,brokerPassword,amqLibraryConfigurationLocation,slovid_publisher_key);
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
JFrame frame = new JFrame("Broker input app");
|
||||
JTextField broker_ipTextField = new JTextField("localhost", 10);
|
||||
JComboBox<String> smallTextField = new JComboBox<>(new String[]{"eu.nebulouscloud.monitoring.slo.new","eu.nebulouscloud.monitoring.realtime.cpu_usage", "eu.nebulouscloud.monitoring.predicted.cpu_usage", debug_data_trigger_topic_name});
|
||||
smallTextField.setEditable(true);
|
||||
JTextField othersmallTextField = new JTextField("slovid", 10);
|
||||
JTextArea largeTextArea = new JTextArea(10, 25);
|
||||
JButton submitButton = new JButton("Send");
|
||||
|
||||
AtomicReference<String> broker_ip = new AtomicReference<>();
|
||||
AtomicReference<String> broker_topic = new AtomicReference<>();
|
||||
AtomicReference<String> message_payload = new AtomicReference<>();
|
||||
AtomicReference<String> publisher_key = new AtomicReference<>();
|
||||
|
||||
smallTextField.addActionListener(e -> {
|
||||
update_event_data();
|
||||
String selectedOption = (String) smallTextField.getSelectedItem();
|
||||
String presetText = presetTexts.getOrDefault(selectedOption, "");
|
||||
largeTextArea.setText(presetText);
|
||||
});
|
||||
|
||||
submitButton.addActionListener(e -> {
|
||||
broker_ip.set(broker_ipTextField.getText());
|
||||
broker_topic.set(smallTextField.getSelectedItem().toString());
|
||||
message_payload.set(largeTextArea.getText());
|
||||
publisher_key.set(othersmallTextField.getText());
|
||||
CustomDataPublisher publisher = new CustomDataPublisher(broker_topic.toString(), broker_ip.toString(), "admin", "admin", EMPTY, publisher_key.toString());
|
||||
publisher.publish(message_payload.toString());
|
||||
});
|
||||
|
||||
JPanel panel = new JPanel();
|
||||
panel.add(new JLabel("Broker to publish to:"));
|
||||
panel.add(broker_ipTextField);
|
||||
panel.add(new JLabel("Topic to publish to:"));
|
||||
panel.add(smallTextField);
|
||||
panel.add(new JLabel("Key to publish with:"));
|
||||
panel.add(othersmallTextField);
|
||||
panel.add(new JLabel("Text to publish:"));
|
||||
panel.add(new JScrollPane(largeTextArea));
|
||||
panel.add(submitButton);
|
||||
|
||||
frame.add(panel);
|
||||
frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
|
||||
frame.pack();
|
||||
frame.setVisible(true);
|
||||
}
|
||||
|
||||
//TODO This assumes that the only content to be sent is json-like
|
||||
public void publish(String json_string_content) {
|
||||
JSONParser parser = new JSONParser();
|
||||
JSONObject json_object = new JSONObject();
|
||||
try{
|
||||
json_object = (JSONObject) parser.parse(json_string_content);
|
||||
}catch (ParseException p){
|
||||
Logger.getGlobal().log(Level.SEVERE,"Could not parse the string content");
|
||||
}
|
||||
private_publisher_instance.send(json_object);
|
||||
}
|
||||
|
||||
public void publish(JSONObject json_object) {
|
||||
private_publisher_instance.send(json_object);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
class OldCustomDataPublisher {
|
||||
private static HashMap<String, HashSet<String>> broker_and_topics_to_publish_to = new HashMap<>();
|
||||
private Publisher private_publisher_instance;
|
||||
private String topic;
|
||||
private String broker_ip;
|
||||
public CustomDataPublisher(String broker_topic, String broker_ip, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation,String publisher_key) {
|
||||
public OldCustomDataPublisher(String broker_topic, String broker_ip, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation,String publisher_key) {
|
||||
|
||||
boolean publisher_configuration_changed;
|
||||
ArrayList<Publisher> publishers = new ArrayList<>();
|
||||
@ -49,7 +215,7 @@ public class CustomDataPublisher {
|
||||
}
|
||||
|
||||
|
||||
public CustomDataPublisher(String broker_topic, String broker_ip, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation) {
|
||||
public OldCustomDataPublisher(String broker_topic, String broker_ip, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation) {
|
||||
this(broker_topic,broker_ip,brokerUsername,brokerPassword,amqLibraryConfigurationLocation,slovid_publisher_key);
|
||||
}
|
||||
public static void main(String[] args){
|
||||
@ -58,25 +224,30 @@ public class CustomDataPublisher {
|
||||
//msg.put("key","value");
|
||||
|
||||
JFrame frame = new JFrame("Broker input app");
|
||||
JTextField broker_ipTextField = new JTextField("localhost",10);
|
||||
JTextField smallTextField = new JTextField("eu.nebulouscloud.monitoring.metric_list",30);
|
||||
JTextField othersmallTextField = new JTextField("slovid",20);
|
||||
JTextArea largeTextArea = new JTextArea(10, 30);
|
||||
JButton submitButton = new JButton("Send");
|
||||
|
||||
AtomicReference<String> broker_ip = new AtomicReference<>();
|
||||
AtomicReference<String> broker_topic = new AtomicReference<>();
|
||||
AtomicReference<String> message_payload = new AtomicReference<>();
|
||||
AtomicReference<String> publisher_key = new AtomicReference<>();
|
||||
|
||||
|
||||
submitButton.addActionListener(e -> {
|
||||
broker_ip.set(broker_ipTextField.getText());
|
||||
broker_topic.set(smallTextField.getText());
|
||||
message_payload.set(largeTextArea.getText());
|
||||
publisher_key.set(othersmallTextField.getText());
|
||||
CustomDataPublisher publisher = new CustomDataPublisher(broker_topic.toString(),"localhost","admin","admin",EMPTY,publisher_key.toString());
|
||||
OldCustomDataPublisher publisher = new OldCustomDataPublisher(broker_topic.toString(),broker_ip.toString(),"admin","admin",EMPTY,publisher_key.toString());
|
||||
publisher.publish(message_payload.toString());
|
||||
});
|
||||
|
||||
JPanel panel = new JPanel();
|
||||
panel.add(new JLabel("Broker to publish to:"));
|
||||
panel.add(broker_ipTextField);
|
||||
panel.add(new JLabel("Topic to publish to:"));
|
||||
panel.add(smallTextField);
|
||||
panel.add(new JLabel("Key to publish with:"));
|
||||
|
@ -45,13 +45,13 @@ public class MonitoringAttributeStatistics {
|
||||
this.lower_bound = lower_bound;
|
||||
}
|
||||
|
||||
public void update_attribute_statistics(double new_attribute_value){
|
||||
public void update_attribute_statistics(Number new_attribute_value){
|
||||
count++;
|
||||
|
||||
double mean_differential = (new_attribute_value - current_mean) / count;
|
||||
double mean_differential = (new_attribute_value.doubleValue() - current_mean) / count;
|
||||
double new_mean = current_mean + mean_differential;
|
||||
|
||||
double dsquared_increment = (new_attribute_value - new_mean) * (new_attribute_value - current_mean);
|
||||
double dsquared_increment = (new_attribute_value.doubleValue() - new_mean) * (new_attribute_value.doubleValue() - current_mean);
|
||||
double new_dsquared = current_dsquared + dsquared_increment;
|
||||
|
||||
current_mean = new_mean;
|
||||
@ -59,7 +59,7 @@ public class MonitoringAttributeStatistics {
|
||||
|
||||
if (!hard_upper_bound_is_set){
|
||||
if (count==1) {
|
||||
upper_bound = new_attribute_value;
|
||||
upper_bound = new_attribute_value.doubleValue();
|
||||
}else {
|
||||
|
||||
double candidate_upper_value = new_mean + Math.sqrt(10.0) * Math.sqrt(new_dsquared / (count - 1)); //Chebyshev-based 90th percentile value
|
||||
@ -70,7 +70,7 @@ public class MonitoringAttributeStatistics {
|
||||
}
|
||||
if (!hard_lower_bound_is_set) {
|
||||
if (count==1){
|
||||
lower_bound = new_attribute_value;
|
||||
lower_bound = new_attribute_value.doubleValue();
|
||||
}else {
|
||||
double candidate_lower_value = new_mean - Math.sqrt(10.0) * Math.sqrt(new_dsquared / (count - 1)); //Chebyshev-based 90th percentile value
|
||||
//if (candidate_lower_value < lower_bound) {
|
||||
|
@ -49,7 +49,7 @@ public class PredictedMonitoringAttribute {
|
||||
this.initialized = true;
|
||||
this.name = name;
|
||||
this.threshold = threshold;
|
||||
double current_value = RealtimeMonitoringAttribute.get_metric_value(detector,name);
|
||||
double current_value = detector.get_metric_value(name);
|
||||
if (Double.isNaN(current_value)){
|
||||
Logger.getGlobal().log(info_logging_level,"Detected NaN value for metric "+name+". Thus we cannot compute severity although a predicted value of "+forecasted_value+" has arrived");
|
||||
this.initialized = false;
|
||||
|
@ -14,84 +14,83 @@ import utilities.MathUtils;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import static configuration.Constants.*;
|
||||
import static java.lang.Integer.valueOf;
|
||||
import static utility_beans.PredictedMonitoringAttribute.*;
|
||||
|
||||
public class RealtimeMonitoringAttribute {
|
||||
|
||||
protected String name;
|
||||
private Double upper_bound;
|
||||
private Double lower_bound;
|
||||
private CircularFifoQueue<Double> actual_metric_values = new CircularFifoQueue<>(kept_values_per_metric); //the previous actual values of the metric
|
||||
public enum AttributeValuesType{Integer, Unknown, Double}
|
||||
private AttributeValuesType type;
|
||||
private Number upper_bound;
|
||||
private Number lower_bound;
|
||||
private CircularFifoQueue<Number> actual_metric_values = new CircularFifoQueue<>(kept_values_per_metric); //the previous actual values of the metric
|
||||
|
||||
public RealtimeMonitoringAttribute(String name, Double lower_bound, Double upper_bound){
|
||||
public RealtimeMonitoringAttribute(String name, Number lower_bound, Number upper_bound,AttributeValuesType type){
|
||||
this.name = name;
|
||||
this.lower_bound = lower_bound;
|
||||
this.upper_bound = upper_bound;
|
||||
this.type = type;
|
||||
}
|
||||
|
||||
public RealtimeMonitoringAttribute(String name, Collection<Double> values){
|
||||
this.name = name;
|
||||
public RealtimeMonitoringAttribute(String name, Collection<Double> values,AttributeValuesType type){
|
||||
this.lower_bound = 0.0;
|
||||
this.upper_bound = 100.0;
|
||||
this.upper_bound= 100.0;
|
||||
this.name = name;
|
||||
this.type = type;
|
||||
//Equivalent to below: values.stream().forEach(x -> actual_metric_values.add(x));
|
||||
actual_metric_values.addAll(values);
|
||||
}
|
||||
public RealtimeMonitoringAttribute(String name, Double value){
|
||||
public RealtimeMonitoringAttribute(String name, Number value){
|
||||
this.name = name;
|
||||
this.lower_bound = 0.0;
|
||||
this.upper_bound = 100.0;
|
||||
this.lower_bound = 0;
|
||||
this.upper_bound= 100;
|
||||
if (value instanceof Integer){
|
||||
this.type = AttributeValuesType.Integer;
|
||||
}else if (value instanceof Double){
|
||||
this.type = AttributeValuesType.Double;
|
||||
}
|
||||
actual_metric_values.add(value);
|
||||
}
|
||||
|
||||
public RealtimeMonitoringAttribute(String name){
|
||||
public RealtimeMonitoringAttribute(String name,Boolean has_infinite_bounds,AttributeValuesType type){
|
||||
this.name = name;
|
||||
this.lower_bound = 0.0;
|
||||
this.upper_bound = 100.0;
|
||||
}
|
||||
|
||||
public static Double get_metric_value(DetectorSubcomponent detector, String metric_name){
|
||||
CircularFifoQueue<Double> actual_metric_values = detector.getSubcomponent_state().getMonitoring_attributes().get(metric_name).getActual_metric_values();
|
||||
if (actual_metric_values.size()==0){
|
||||
Logger.getGlobal().log(warning_logging_level,"Trying to retrieve realtime values from an empty queue for metric "+metric_name);
|
||||
this.type = type;
|
||||
if (has_infinite_bounds){
|
||||
if (type==AttributeValuesType.Double) {
|
||||
this.upper_bound = Double.POSITIVE_INFINITY;
|
||||
this.lower_bound = Double.NEGATIVE_INFINITY;
|
||||
}
|
||||
else if (type==AttributeValuesType.Integer){
|
||||
this.upper_bound = Integer.MAX_VALUE;
|
||||
this.lower_bound = Integer.MIN_VALUE;
|
||||
}
|
||||
}else {
|
||||
this.lower_bound = 0.0;
|
||||
this.upper_bound = 100.0;
|
||||
}
|
||||
return aggregate_metric_values(actual_metric_values);
|
||||
}
|
||||
|
||||
private static Double aggregate_metric_values(Iterable<Double> metric_values) {
|
||||
public static Double aggregate_metric_values(Iterable<Number> metric_values) {
|
||||
return MathUtils.get_average(metric_values);
|
||||
}
|
||||
|
||||
public static void update_monitoring_attribute_value(DetectorSubcomponent detector, String name,Double value){
|
||||
if(detector.getSubcomponent_state().getMonitoring_attributes().get(name)==null){
|
||||
detector.getSubcomponent_state().getMonitoring_attributes().put(name,new RealtimeMonitoringAttribute(name));
|
||||
//monitoring_attributes_max_values.put(name,value);
|
||||
//monitoring_attributes_min_values.put(name,value);
|
||||
|
||||
}
|
||||
detector.getSubcomponent_state().getMonitoring_attributes().get(name).getActual_metric_values().add(value);
|
||||
detector.getSubcomponent_state().getMonitoring_attributes_statistics().get(name).update_attribute_statistics(value);
|
||||
/*
|
||||
if(get_90th_percentile_high_value(name,value)>monitoring_attributes_max_values.get(name)){
|
||||
monitoring_attributes_max_values.put(name,value);
|
||||
}else if (get_90th_percentile_low_value(name,value)<monitoring_attributes_min_values.get(name)){
|
||||
monitoring_attributes_min_values.put(name,value);
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
|
||||
public static <T extends Iterable<String>> void initialize_monitoring_attribute_rates_of_change(DetectorSubcomponent detector, T metric_names){
|
||||
initialize_monitoring_attribute_hashmap(detector.getSubcomponent_state().getMonitoring_attributes(),metric_names);
|
||||
initialize_attribute_value_hashmap(getAttributes_maximum_rate_of_change(),metric_names);
|
||||
initialize_attribute_value_hashmap(getAttributes_minimum_rate_of_change(),metric_names);
|
||||
initialize_attribute_double_value_hashmap(getAttributes_maximum_rate_of_change(),metric_names);
|
||||
initialize_attribute_double_value_hashmap(getAttributes_minimum_rate_of_change(),metric_names);
|
||||
}
|
||||
|
||||
public static <T extends Iterable<String>> void initialize_monitoring_attribute_hashmap(HashMap<String, RealtimeMonitoringAttribute> map, T metric_names){
|
||||
public static <T extends Iterable<String>> void initialize_monitoring_attribute_hashmap(HashMap<String, RealtimeMonitoringAttribute> map, T metric_names){
|
||||
for (String metric_name : metric_names){
|
||||
map.put(metric_name,new RealtimeMonitoringAttribute(metric_name));
|
||||
map.put(metric_name,new RealtimeMonitoringAttribute(metric_name,false,AttributeValuesType.Unknown));
|
||||
}
|
||||
}
|
||||
|
||||
@ -101,14 +100,14 @@ public class RealtimeMonitoringAttribute {
|
||||
}
|
||||
}
|
||||
|
||||
public static <T extends HashMap<String, RealtimeMonitoringAttribute>> void initialize_monitoring_attributes (DetectorSubcomponent detector, T metric_names_bounds){
|
||||
public static void initialize_monitoring_attributes (DetectorSubcomponent detector, HashMap<String,RealtimeMonitoringAttribute> metric_names_bounds){
|
||||
for (String metric_name : metric_names_bounds.keySet()) {
|
||||
detector.getSubcomponent_state().getMonitoring_attributes_statistics().put(metric_name, new
|
||||
MonitoringAttributeStatistics(metric_names_bounds.get(metric_name).lower_bound,metric_names_bounds.get(metric_name).upper_bound));
|
||||
MonitoringAttributeStatistics(metric_names_bounds.get(metric_name).lower_bound.doubleValue(),metric_names_bounds.get(metric_name).upper_bound.doubleValue()));
|
||||
}
|
||||
}
|
||||
|
||||
private static <T extends Iterable<String>> void initialize_attribute_value_hashmap(HashMap<String,Double> hashmap ,T metric_names){
|
||||
private static <T extends Iterable<String>> void initialize_attribute_double_value_hashmap(HashMap<String,Double> hashmap , T metric_names){
|
||||
for (String metric_name: metric_names){
|
||||
hashmap.put(metric_name,0.0);
|
||||
}
|
||||
@ -122,27 +121,34 @@ public class RealtimeMonitoringAttribute {
|
||||
return name;
|
||||
}
|
||||
|
||||
public CircularFifoQueue<Double> getActual_metric_values() {
|
||||
public CircularFifoQueue<Number> getActual_metric_values() {
|
||||
return actual_metric_values;
|
||||
}
|
||||
|
||||
public void setActual_metric_values(CircularFifoQueue<Double> actual_metric_values) {
|
||||
public void setActual_metric_values(CircularFifoQueue<Number> actual_metric_values) {
|
||||
this.actual_metric_values = actual_metric_values;
|
||||
}
|
||||
|
||||
public Double getUpper_bound() {
|
||||
return upper_bound;
|
||||
return upper_bound.doubleValue();
|
||||
}
|
||||
|
||||
public void setUpper_bound(Double upper_bound) {
|
||||
public void setUpper_bound(Number upper_bound) {
|
||||
this.upper_bound = upper_bound;
|
||||
}
|
||||
|
||||
public Double getLower_bound() {
|
||||
return lower_bound;
|
||||
return lower_bound.doubleValue();
|
||||
}
|
||||
|
||||
public void setLower_bound(Double lower_bound) {
|
||||
public void setLower_bound(Number lower_bound) {
|
||||
this.lower_bound = lower_bound;
|
||||
}
|
||||
|
||||
public void setType(AttributeValuesType type){
|
||||
this.type = type;
|
||||
}
|
||||
public AttributeValuesType getType(){
|
||||
return type;
|
||||
}
|
||||
}
|
||||
|
@ -17,17 +17,28 @@ import static configuration.Constants.EMPTY;
|
||||
|
||||
|
||||
public class SynchronizedStringMap {
|
||||
private Map<String, String> synchronized_map = Collections.synchronizedMap(new HashMap<>()); // using Collections.synchronized map as we intend to add/remove topics to the map dynamically
|
||||
public String get_synchronized_contents(String name){
|
||||
if (synchronized_map.containsKey(name)) {
|
||||
return synchronized_map.get(name);
|
||||
private Map<String, Map<String,String>> synchronized_map = Collections.synchronizedMap(new HashMap<>()); // using Collections.synchronized map as we intend to add/remove topics to the map dynamically
|
||||
public String get_synchronized_contents(String application_name, String topic_name){
|
||||
if (synchronized_map.containsKey(application_name)) {
|
||||
if(synchronized_map.get(application_name).containsKey(topic_name)){
|
||||
return synchronized_map.get(application_name).get(topic_name);
|
||||
}else{
|
||||
synchronized_map.get(application_name).put(topic_name,EMPTY);
|
||||
return EMPTY;
|
||||
}
|
||||
}else{
|
||||
synchronized_map.put(name,new String(EMPTY));
|
||||
return synchronized_map.get(name);
|
||||
HashMap new_map = new HashMap<>();
|
||||
synchronized_map.put(application_name,Collections.synchronizedMap(new HashMap<>()));
|
||||
synchronized_map.get(application_name).put(topic_name,EMPTY);
|
||||
return EMPTY;
|
||||
}
|
||||
}
|
||||
public String assign_value(String topic, String value){
|
||||
synchronized_map.put(topic,value);
|
||||
return synchronized_map.get(topic);
|
||||
public void assign_value(String application_name, String topic, String value){
|
||||
if (synchronized_map.containsKey(application_name)){
|
||||
synchronized_map.get(application_name).put(topic,value);
|
||||
}else{
|
||||
synchronized_map.put(application_name,Collections.synchronizedMap(new HashMap<>()));
|
||||
synchronized_map.get(application_name).put(topic,value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -11,5 +11,5 @@ broker_password = admin
|
||||
|
||||
slo_violation_probability_threshold = 0.1
|
||||
slo_violation_determination_method = prconf-delta
|
||||
time_horizon_seconds = 120
|
||||
time_horizon_seconds = 5
|
||||
maximum_acceptable_forward_predictions = 30
|
@ -46,7 +46,7 @@ public class ConnectivityTests {
|
||||
|
||||
BrokerPublisher publisher = new BrokerPublisher("test_topic",prop.getProperty("broker_ip_url"),prop.getProperty("broker_username"),prop.getProperty("broker_password"), amq_library_configuration_location);
|
||||
|
||||
BrokerSubscriber subscriber = new BrokerSubscriber("test_topic",prop.getProperty("broker_ip_url"),prop.getProperty("broker_username"),prop.getProperty("broker_password"),amq_library_configuration_location);
|
||||
BrokerSubscriber subscriber = new BrokerSubscriber("test_topic",prop.getProperty("broker_ip_url"),prop.getProperty("broker_username"),prop.getProperty("broker_password"),amq_library_configuration_location,default_application_name);
|
||||
|
||||
JSONObject object_to_publish = new JSONObject();
|
||||
object_to_publish.put("ram","95");
|
||||
|
@ -19,7 +19,6 @@ import static configuration.Constants.roc_limit;
|
||||
import static utility_beans.PredictedMonitoringAttribute.getAttributes_maximum_rate_of_change;
|
||||
import static utility_beans.PredictedMonitoringAttribute.getAttributes_minimum_rate_of_change;
|
||||
import static utility_beans.RealtimeMonitoringAttribute.simple_initialize_0_100_bounded_attributes;
|
||||
import static utility_beans.RealtimeMonitoringAttribute.update_monitoring_attribute_value;
|
||||
|
||||
public class DerivedMonitoringAttributeTests {
|
||||
|
||||
@ -28,7 +27,7 @@ public class DerivedMonitoringAttributeTests {
|
||||
public void roc_calculation_test(){
|
||||
|
||||
simple_initialize_0_100_bounded_attributes(detector, List.of("cpu"));
|
||||
update_monitoring_attribute_value(detector,"cpu",0.0);
|
||||
detector.update_monitoring_attribute_value("cpu",0.0);
|
||||
detector.getSubcomponent_state().getMonitoring_attributes_roc_statistics().put("cpu", new MonitoringAttributeStatistics()); //The rate of change of a metric, is a metric which itself should be monitored for its upper bound
|
||||
|
||||
getAttributes_maximum_rate_of_change().put("cpu",roc_limit);
|
||||
|
@ -36,7 +36,7 @@ public class SeverityTests {
|
||||
for(String monitoring_metric_name : metric_names) {
|
||||
detector.getSubcomponent_state().getMonitoring_attributes_roc_statistics().put(monitoring_metric_name, new MonitoringAttributeStatistics()); //The rate of change of a metric, is a metric which itself should be monitored for its upper bound
|
||||
}
|
||||
RealtimeMonitoringAttribute.update_monitoring_attribute_value(detector,"cpu",0.0);
|
||||
detector.update_monitoring_attribute_value("cpu",0.0);
|
||||
|
||||
PredictedMonitoringAttribute prediction_attribute = new PredictedMonitoringAttribute(detector,"cpu",70,1,100.0,100,10,System.currentTimeMillis(),System.currentTimeMillis()+20000);
|
||||
|
||||
@ -61,7 +61,7 @@ public class SeverityTests {
|
||||
for(String monitoring_metric_name : metric_names) {
|
||||
detector.getSubcomponent_state().getMonitoring_attributes_roc_statistics().put(monitoring_metric_name, new MonitoringAttributeStatistics()); //The rate of change of a metric, is a metric which itself should be monitored for its upper bound
|
||||
}
|
||||
RealtimeMonitoringAttribute.update_monitoring_attribute_value(detector,"cpu",30.0);
|
||||
detector.update_monitoring_attribute_value("cpu",30.0);
|
||||
|
||||
PredictedMonitoringAttribute prediction_attribute = new PredictedMonitoringAttribute(detector,"cpu",70,1,80.0,90,5,System.currentTimeMillis(),System.currentTimeMillis()+20000);
|
||||
|
||||
@ -86,7 +86,7 @@ public class SeverityTests {
|
||||
for(String monitoring_metric_name : metric_names) {
|
||||
detector.getSubcomponent_state().getMonitoring_attributes_roc_statistics().put(monitoring_metric_name, new MonitoringAttributeStatistics()); //The rate of change of a metric, is a metric which itself should be monitored for its upper bound
|
||||
}
|
||||
RealtimeMonitoringAttribute.update_monitoring_attribute_value(detector,"cpu",86.0);
|
||||
detector.update_monitoring_attribute_value("cpu",86.0);
|
||||
|
||||
PredictedMonitoringAttribute prediction_attribute = new PredictedMonitoringAttribute(detector,"cpu",75,1,92.0,88,7.8,System.currentTimeMillis(),System.currentTimeMillis()+20000);
|
||||
|
||||
|
@ -44,7 +44,6 @@ import static slo_rule_modelling.SLORule.process_rule_value;
|
||||
import static slo_violation_detector_engine.detector.DetectorSubcomponentUtilities.initialize_subrule_and_attribute_associations;
|
||||
import static utility_beans.CharacterizedThread.CharacterizedThreadRunMode.detached;
|
||||
import static utility_beans.PredictedMonitoringAttribute.getPredicted_monitoring_attributes;
|
||||
import static utility_beans.RealtimeMonitoringAttribute.update_monitoring_attribute_value;
|
||||
|
||||
class MetricConfiguration{
|
||||
public String name;
|
||||
@ -117,11 +116,11 @@ public class UnboundedMonitoringAttributeTests {
|
||||
|
||||
String realtime_metric_topic_name = TopicNames.realtime_metric_values_topic(metric_name);
|
||||
Logger.getGlobal().log(Level.INFO, "Starting realtime subscription at " + realtime_metric_topic_name);
|
||||
BrokerSubscriber subscriber = new BrokerSubscriber(realtime_metric_topic_name, broker_ip_address, broker_username, broker_password, amq_library_configuration_location);
|
||||
BrokerSubscriber subscriber = new BrokerSubscriber(realtime_metric_topic_name, broker_ip_address, broker_username, broker_password, amq_library_configuration_location,default_application_name);
|
||||
BiFunction<String, String, String> function = (topic, message) -> {
|
||||
synchronized (detector.getSubcomponent_state().getMonitoring_attributes().get(topic)) {
|
||||
try {
|
||||
update_monitoring_attribute_value(detector,topic, ((Number) ((JSONObject) new JSONParser().parse(message)).get("metricValue")).doubleValue());
|
||||
detector.update_monitoring_attribute_value(topic, ((Number) ((JSONObject) new JSONParser().parse(message)).get("metricValue")).doubleValue());
|
||||
|
||||
Logger.getGlobal().log(info_logging_level, "RECEIVED message with value for " + topic + " equal to " + (((JSONObject) new JSONParser().parse(message)).get("metricValue")));
|
||||
} catch (ParseException e) {
|
||||
@ -139,9 +138,8 @@ public class UnboundedMonitoringAttributeTests {
|
||||
|
||||
|
||||
String forecasted_metric_topic_name = TopicNames.final_metric_predictions_topic(metric_name);
|
||||
BrokerSubscriber forecasted_subscriber = new BrokerSubscriber(forecasted_metric_topic_name, broker_ip_address,broker_username,broker_password, amq_library_configuration_location);
|
||||
BiFunction<String,String,String> forecasted_function = (topic,message) ->{
|
||||
String predicted_attribute_name = topic.replaceFirst("prediction\\.",EMPTY);
|
||||
BrokerSubscriber forecasted_subscriber = new BrokerSubscriber(forecasted_metric_topic_name, broker_ip_address,broker_username,broker_password, amq_library_configuration_location,default_application_name);
|
||||
BiFunction<BrokerSubscriptionDetails,String,String> forecasted_function = (broker_details,message) ->{
|
||||
HashMap<Integer, HashMap<Long, PredictedMonitoringAttribute>> predicted_attributes = getPredicted_monitoring_attributes();
|
||||
try {
|
||||
double forecasted_value = ((Number)((JSONObject)new JSONParser().parse(message)).get(EventFields.PredictionMetricEventFields.metricValue.name())).doubleValue();
|
||||
@ -151,7 +149,7 @@ public class UnboundedMonitoringAttributeTests {
|
||||
double confidence_interval = ((Number)json_array_confidence_interval.get(1)).doubleValue() - ((Number)json_array_confidence_interval.get(0)).doubleValue();
|
||||
long timestamp = ((Number)((JSONObject)new JSONParser().parse(message)).get(EventFields.PredictionMetricEventFields.timestamp)).longValue();
|
||||
long targeted_prediction_time = ((Number)((JSONObject)new JSONParser().parse(message)).get(EventFields.PredictionMetricEventFields.predictionTime.name())).longValue();
|
||||
Logger.getGlobal().log(info_logging_level,"RECEIVED message with predicted value for "+predicted_attribute_name+" equal to "+ forecasted_value);
|
||||
Logger.getGlobal().log(info_logging_level,"RECEIVED message with predicted value for "+ metric_name +" equal to "+ forecasted_value);
|
||||
|
||||
synchronized (detector.can_modify_slo_rules) {
|
||||
if(!detector.can_modify_slo_rules.getValue()) {
|
||||
@ -168,12 +166,12 @@ public class UnboundedMonitoringAttributeTests {
|
||||
}
|
||||
}
|
||||
//predicted_attributes.get(predicted_attribute_name).clear();
|
||||
for (SLOSubRule subrule : SLOSubRule.getSlo_subrules_per_monitoring_attribute().get(predicted_attribute_name)) {
|
||||
for (SLOSubRule subrule : SLOSubRule.getSlo_subrules_per_monitoring_attribute().get(metric_name)) {
|
||||
getPredicted_monitoring_attributes().computeIfAbsent(subrule.getId(), k -> new HashMap<>());
|
||||
if ( (getPredicted_monitoring_attributes().get(subrule.getId()).get(targeted_prediction_time)!=null) &&(getPredicted_monitoring_attributes().get(subrule.getId()).get(targeted_prediction_time).getTimestamp()>timestamp)){
|
||||
//do nothing, as in this case an older prediction has arrived for a metric delayed, and so it should be disregarded
|
||||
}else {
|
||||
PredictedMonitoringAttribute prediction_attribute = new PredictedMonitoringAttribute(detector, predicted_attribute_name, subrule.getThreshold(), subrule.getId(), forecasted_value, probability_confidence, confidence_interval, timestamp,targeted_prediction_time);
|
||||
PredictedMonitoringAttribute prediction_attribute = new PredictedMonitoringAttribute(detector, metric_name, subrule.getThreshold(), subrule.getId(), forecasted_value, probability_confidence, confidence_interval, timestamp,targeted_prediction_time);
|
||||
|
||||
//predicted_attributes.get(predicted_attribute_name).add(prediction_attribute);
|
||||
subrule.setAssociated_predicted_monitoring_attribute(prediction_attribute);
|
||||
@ -182,6 +180,7 @@ public class UnboundedMonitoringAttributeTests {
|
||||
}
|
||||
}
|
||||
detector.can_modify_slo_rules.setValue(true);
|
||||
detector.can_modify_slo_rules.notifyAll();
|
||||
}
|
||||
//SLOViolationCalculator.get_Severity_all_metrics_method(prediction_attribute)
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user