Miscellaneous changes
Update to using better logging Subscription and parsing of metric_list message Major functionality updates to BrokerPublisher and BrokerSubscriber Introduction of custom helper classes Change-Id: I94a6fdb4612de292c24511445f1236cdce94b363
This commit is contained in:
parent
4ec762fd60
commit
94672e4461
@ -19,4 +19,6 @@ COPY src/main/resources/config/eu.melodic.event.brokerclient.properties /home/sr
|
||||
COPY --from=build /home/app/target/SLO-Violation-Detector-4.0-SNAPSHOT.jar /home/SLOSeverityCalculator-4.0-SNAPSHOT.jar
|
||||
WORKDIR /home
|
||||
ENV LOG_FILE /home/slo_violation_detector.log
|
||||
CMD ["/bin/sh","-c","java -jar SLOSeverityCalculator-4.0-SNAPSHOT.jar > $LOG_FILE 2>&1"]
|
||||
#CMD ["/bin/sh","-c","java -jar SLOSeverityCalculator-4.0-SNAPSHOT.jar > $LOG_FILE 2>&1"]
|
||||
#CMD ["/bin/sh","-c","java -jar SLOSeverityCalculator-4.0-SNAPSHOT.jar 2>&1 | tee $LOG_FILE"]
|
||||
CMD ["/bin/sh","-c","java -jar SLOSeverityCalculator-4.0-SNAPSHOT.jar 2>&1"]
|
@ -56,9 +56,23 @@
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>eu.nebulouscloud</groupId>
|
||||
<artifactId>exn-connector-java</artifactId>
|
||||
<version>1.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
|
||||
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter</artifactId>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>ch.qos.logback</groupId>
|
||||
<artifactId>logback-classic</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
@ -129,21 +143,9 @@
|
||||
<url>https://repo1.maven.org/maven2/</url>
|
||||
</repository>
|
||||
<repository>
|
||||
<id>gitlab-maven-morphemic-preprocessor</id>
|
||||
<url>https://gitlab.ow2.org/api/v4/projects/1370/packages/maven</url>
|
||||
<id>nexus-nebulous</id>
|
||||
<url>https://s01.oss.sonatype.org/content/repositories/snapshots/</url>
|
||||
</repository>
|
||||
</repositories>
|
||||
|
||||
<distributionManagement>
|
||||
<snapshotRepository>
|
||||
<id>eu.7bulls</id>
|
||||
<name>Melodic 7bulls repository</name>
|
||||
<url>https://nexus.7bulls.eu:8443/repository/maven-snapshots/</url>
|
||||
</snapshotRepository>
|
||||
<repository>
|
||||
<id>eu.7bulls</id>
|
||||
<name>Melodic 7bulls repository</name>
|
||||
<url>https://nexus.7bulls.eu:8443/repository/maven-releases/</url>
|
||||
</repository>
|
||||
</distributionManagement>
|
||||
</project>
|
||||
|
@ -23,15 +23,15 @@ public class Constants {
|
||||
public static int time_horizon_seconds;
|
||||
public static int maximum_acceptable_forward_predictions;
|
||||
public static String [] logic_operators = {"and","or"};
|
||||
public static final String default_handled_application_name = "default_application";
|
||||
public static final String default_application_name = "default_application";
|
||||
public static final String slovid_publisher_key = "slovid_publisher";
|
||||
public static URI base_project_path;
|
||||
public static String configuration_file_location = "src/main/resources/config/input_data.properties";
|
||||
public static String amq_library_configuration_location = "src/main/resources/config/eu.melodic.event.brokerclient.properties";
|
||||
public static String configuration_file_location = "slo-violation-detector/src/main/resources/config/eu.nebulous.slo_violation_detector.properties";
|
||||
public static String amq_library_configuration_location = "slo-violation-detector/src/main/resources/config/eu.melodic.event.brokerclient.properties";
|
||||
public static String topic_for_severity_announcement = "prediction.slo_severity_value";
|
||||
public static String topic_for_lost_device_announcement = "eu.nebulouscloud.device_lost";
|
||||
public static String slo_rules_topic = "eu.nebulouscloud.monitoring.slo.new";
|
||||
public static String metric_list_topic = "eu.nebulouscloud.monitoring.metric_list";
|
||||
public static ArrayList<String> director_subscription_topics;
|
||||
public static double slo_violation_probability_threshold = 0.5; //The threshold over which the probability of a predicted slo violation should be to have a violation detection
|
||||
public static int kept_values_per_metric = 5; //Default to be overriden from the configuration file. This indicates how many metric values are kept to calculate the "previous" metric value during the rate of change calculation
|
||||
public static String roc_calculation_mode = "prototype";
|
||||
|
@ -43,7 +43,7 @@ public class AttributeSubscription {
|
||||
for (String metric:slo_rule.get_monitoring_attributes()){
|
||||
|
||||
String realtime_metric_topic_name = TopicNames.realtime_metric_values_topic(metric);
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Starting realtime subscription at "+realtime_metric_topic_name);
|
||||
Logger.getGlobal().log(info_logging_level,"Starting realtime subscription at "+realtime_metric_topic_name);
|
||||
BrokerSubscriber subscriber = new BrokerSubscriber(realtime_metric_topic_name, broker_ip_address,broker_username,broker_password, amq_library_configuration_location);
|
||||
BiFunction<String,String,String> function = (topic, message) ->{
|
||||
RealtimeMonitoringAttribute realtimeMonitoringAttribute = new RealtimeMonitoringAttribute(topic);
|
||||
@ -51,13 +51,13 @@ public class AttributeSubscription {
|
||||
try {
|
||||
update_monitoring_attribute_value(detector,topic,((Number)((JSONObject)new JSONParser().parse(message)).get("metricValue")).doubleValue());
|
||||
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"RECEIVED message with value for "+topic+" equal to "+(((JSONObject)new JSONParser().parse(message)).get("metricValue")));
|
||||
Logger.getGlobal().log(info_logging_level,"RECEIVED message with value for "+topic+" equal to "+(((JSONObject)new JSONParser().parse(message)).get("metricValue")));
|
||||
} catch (ParseException e) {
|
||||
e.printStackTrace();
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"A parsing exception was caught while parsing message: "+message);
|
||||
Logger.getGlobal().log(info_logging_level,"A parsing exception was caught while parsing message: "+message);
|
||||
} catch (Exception e){
|
||||
e.printStackTrace();
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"An unknown exception was caught while parsing message: "+message);
|
||||
Logger.getGlobal().log(info_logging_level,"An unknown exception was caught while parsing message: "+message);
|
||||
}
|
||||
}
|
||||
return message;
|
||||
@ -69,12 +69,12 @@ public class AttributeSubscription {
|
||||
throw new InterruptedException();
|
||||
}
|
||||
}catch (Exception i){
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Possible interruption of realtime subscriber thread for "+realtime_metric_topic_name+" - if not stacktrace follows");
|
||||
Logger.getGlobal().log(info_logging_level,"Possible interruption of realtime subscriber thread for "+realtime_metric_topic_name+" - if not stacktrace follows");
|
||||
if (! (i instanceof InterruptedException)){
|
||||
i.printStackTrace();
|
||||
}
|
||||
}finally{
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Removing realtime subscriber thread for "+realtime_metric_topic_name);
|
||||
Logger.getGlobal().log(info_logging_level,"Removing realtime subscriber thread for "+realtime_metric_topic_name);
|
||||
detector.getSubcomponent_state().slo_bound_running_threads.remove("realtime_subscriber_thread_" + realtime_metric_topic_name);
|
||||
}
|
||||
};
|
||||
@ -83,7 +83,7 @@ public class AttributeSubscription {
|
||||
|
||||
|
||||
String forecasted_metric_topic_name = TopicNames.final_metric_predictions_topic(metric);
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Starting forecasted metric subscription at "+forecasted_metric_topic_name);
|
||||
Logger.getGlobal().log(info_logging_level,"Starting forecasted metric subscription at "+forecasted_metric_topic_name);
|
||||
BrokerSubscriber forecasted_subscriber = new BrokerSubscriber(forecasted_metric_topic_name, broker_ip_address,broker_username,broker_password, amq_library_configuration_location);
|
||||
|
||||
BiFunction<String,String,String> forecasted_function = (topic,message) ->{
|
||||
@ -98,13 +98,13 @@ public class AttributeSubscription {
|
||||
try{
|
||||
confidence_interval = ((Number) json_array_confidence_interval.get(1)).doubleValue() - ((Number) json_array_confidence_interval.get(0)).doubleValue();
|
||||
}catch (ClassCastException | NumberFormatException c){
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Catching exception successfully");
|
||||
Logger.getGlobal().log(info_logging_level,"Catching exception successfully");
|
||||
c.printStackTrace();
|
||||
confidence_interval = Double.NEGATIVE_INFINITY;
|
||||
}
|
||||
long timestamp = ((Number)((JSONObject)new JSONParser().parse(message)).get(EventFields.PredictionMetricEventFields.timestamp)).longValue();
|
||||
long targeted_prediction_time = ((Number)((JSONObject)new JSONParser().parse(message)).get(EventFields.PredictionMetricEventFields.prediction_time)).longValue();
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"RECEIVED message with predicted value for "+predicted_attribute_name+" equal to "+ forecasted_value);
|
||||
Logger.getGlobal().log(info_logging_level,"RECEIVED message with predicted value for "+predicted_attribute_name+" equal to "+ forecasted_value);
|
||||
|
||||
|
||||
synchronized (detector.ADAPTATION_TIMES_MODIFY) {
|
||||
@ -112,13 +112,13 @@ public class AttributeSubscription {
|
||||
try {
|
||||
detector.ADAPTATION_TIMES_MODIFY.wait();
|
||||
} catch (InterruptedException e) {
|
||||
Logger.getAnonymousLogger().log(warning_logging_level,"Interrupted while waiting to access the lock for adaptation times object");
|
||||
Logger.getGlobal().log(warning_logging_level,"Interrupted while waiting to access the lock for adaptation times object");
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
detector.ADAPTATION_TIMES_MODIFY.setValue(false);
|
||||
if (!detector.getSubcomponent_state().adaptation_times.contains(targeted_prediction_time) && (!detector.getSubcomponent_state().adaptation_times_pending_processing.contains(targeted_prediction_time)) && ((targeted_prediction_time * 1000 - time_horizon_seconds * 1000L) > (Clock.systemUTC()).millis())) {
|
||||
Logger.getAnonymousLogger().log(info_logging_level, "Adding a new targeted prediction time " + targeted_prediction_time + " expiring in "+(targeted_prediction_time*1000-System.currentTimeMillis())+" from topic "+topic);
|
||||
Logger.getGlobal().log(info_logging_level, "Adding a new targeted prediction time " + targeted_prediction_time + " expiring in "+(targeted_prediction_time*1000-System.currentTimeMillis())+" from topic "+topic);
|
||||
detector.getSubcomponent_state().adaptation_times.add(targeted_prediction_time);
|
||||
synchronized (detector.PREDICTION_EXISTS) {
|
||||
detector.PREDICTION_EXISTS.setValue(true);
|
||||
@ -126,12 +126,12 @@ public class AttributeSubscription {
|
||||
}
|
||||
}else {
|
||||
if (detector.getSubcomponent_state().adaptation_times.contains(targeted_prediction_time)) {
|
||||
Logger.getAnonymousLogger().log(info_logging_level, "Could not add the new targeted prediction time " + targeted_prediction_time + " from topic " + topic + " as it is already present");
|
||||
Logger.getGlobal().log(info_logging_level, "Could not add the new targeted prediction time " + targeted_prediction_time + " from topic " + topic + " as it is already present");
|
||||
} else if (!detector.getSubcomponent_state().adaptation_times_pending_processing.contains(targeted_prediction_time)) {
|
||||
if (targeted_prediction_time * 1000 - time_horizon_seconds * 1000L - (Clock.systemUTC()).millis() <= 0) {
|
||||
Logger.getAnonymousLogger().log(info_logging_level, "Could not add the new targeted prediction time " + targeted_prediction_time + " from topic " + topic + " as it would expire in " + (targeted_prediction_time * 1000 - System.currentTimeMillis()) + " milliseconds and the prediction horizon is " + time_horizon_seconds * 1000L + " milliseconds");
|
||||
Logger.getGlobal().log(info_logging_level, "Could not add the new targeted prediction time " + targeted_prediction_time + " from topic " + topic + " as it would expire in " + (targeted_prediction_time * 1000 - System.currentTimeMillis()) + " milliseconds and the prediction horizon is " + time_horizon_seconds * 1000L + " milliseconds");
|
||||
}else{
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Adding new prediction time "+targeted_prediction_time+" which expires in " + (targeted_prediction_time * 1000 - System.currentTimeMillis()));
|
||||
Logger.getGlobal().log(info_logging_level,"Adding new prediction time "+targeted_prediction_time+" which expires in " + (targeted_prediction_time * 1000 - System.currentTimeMillis()));
|
||||
detector.getSubcomponent_state().adaptation_times_pending_processing.add(targeted_prediction_time);
|
||||
}
|
||||
}
|
||||
@ -166,11 +166,11 @@ public class AttributeSubscription {
|
||||
} catch (ParseException p){
|
||||
p.printStackTrace();
|
||||
} catch (InterruptedException e) {
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Monitoring attribute subscription thread for prediction attribute "+predicted_attribute_name+" is stopped");
|
||||
Logger.getGlobal().log(info_logging_level,"Monitoring attribute subscription thread for prediction attribute "+predicted_attribute_name+" is stopped");
|
||||
} catch (ClassCastException | NumberFormatException n){
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Error while trying to parse message\n"+message);
|
||||
Logger.getGlobal().log(info_logging_level,"Error while trying to parse message\n"+message);
|
||||
} catch (Exception e){
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"An unknown exception was caught\n"+message);
|
||||
Logger.getGlobal().log(info_logging_level,"An unknown exception was caught\n"+message);
|
||||
}
|
||||
return message;
|
||||
};
|
||||
@ -185,12 +185,12 @@ public class AttributeSubscription {
|
||||
throw new InterruptedException();
|
||||
}
|
||||
}catch (Exception i){
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Possible interruption of forecasting subscriber thread for "+forecasted_metric_topic_name+" - if not stacktrace follows");
|
||||
Logger.getGlobal().log(info_logging_level,"Possible interruption of forecasting subscriber thread for "+forecasted_metric_topic_name+" - if not stacktrace follows");
|
||||
if (! (i instanceof InterruptedException)){
|
||||
i.printStackTrace();
|
||||
}
|
||||
}finally {
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Removing forecasting subscriber thread for "+forecasted_metric_topic_name);
|
||||
Logger.getGlobal().log(info_logging_level,"Removing forecasting subscriber thread for "+forecasted_metric_topic_name);
|
||||
detector.getSubcomponent_state().persistent_running_detector_threads.remove("forecasting_subscriber_thread_"+forecasted_metric_topic_name);
|
||||
}
|
||||
};
|
||||
|
@ -3,7 +3,7 @@ package runtime;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
import slo_violation_detector_engine.detector.DetectorSubcomponent;
|
||||
|
||||
import static configuration.Constants.default_handled_application_name;
|
||||
import static configuration.Constants.default_application_name;
|
||||
import static runtime.Main.detectors;
|
||||
import static slo_violation_detector_engine.detector.DetectorSubcomponent.detector_integer_id;
|
||||
import static utilities.DebugDataSubscription.debug_data_generation;
|
||||
@ -14,7 +14,7 @@ public class DetectorRequestMappings {
|
||||
|
||||
@RequestMapping("/add-new-detector")
|
||||
public static String start_new_detector_subcomponent() {
|
||||
detectors.add(new DetectorSubcomponent(default_handled_application_name,detached));
|
||||
detectors.add(new DetectorSubcomponent(default_application_name,detached));
|
||||
return ("Spawned new SLO Detector subcomponent instance! Currently, there have been "+detector_integer_id+" detectors spawned");
|
||||
}
|
||||
|
||||
|
@ -15,7 +15,7 @@ import utility_beans.RealtimeMonitoringAttribute;
|
||||
|
||||
import java.util.HashMap;
|
||||
|
||||
import static configuration.Constants.default_handled_application_name;
|
||||
import static configuration.Constants.default_application_name;
|
||||
import static org.springframework.http.MediaType.APPLICATION_JSON_VALUE;
|
||||
|
||||
@RestController
|
||||
@ -33,7 +33,7 @@ public class DirectorRequestMappings {
|
||||
return "Error in parsing the input string, the exception message follows:\n"+e;
|
||||
}
|
||||
application_name = (String) rule_representation_json.get("name");
|
||||
DetectorSubcomponent new_detector = DetectorSubcomponent.detector_subcomponents.getOrDefault(application_name,new DetectorSubcomponent(default_handled_application_name,CharacterizedThread.CharacterizedThreadRunMode.detached));
|
||||
DetectorSubcomponent new_detector = DetectorSubcomponent.detector_subcomponents.getOrDefault(application_name,new DetectorSubcomponent(default_application_name,CharacterizedThread.CharacterizedThreadRunMode.detached));
|
||||
new_detector.slo_rule_topic_subscriber_function.apply(Constants.slo_rules_topic,string_rule_representation);
|
||||
return ("New application was spawned");
|
||||
}
|
||||
@ -58,17 +58,17 @@ public class DirectorRequestMappings {
|
||||
for (Object metric : metrics_json_array){
|
||||
JSONObject metric_json = (JSONObject) metric;
|
||||
String metric_name = (String) metric_json.get("name");
|
||||
int upper_bound = 100,lower_bound = 0;
|
||||
double upper_bound = 100.0,lower_bound = 0.0;
|
||||
if (((String) metric_json.get("upper_bound")).toLowerCase().contains("-inf")){
|
||||
upper_bound = -Integer.MAX_VALUE;
|
||||
upper_bound = Double.NEGATIVE_INFINITY;
|
||||
}else if (((String) metric_json.get("upper_bound")).toLowerCase().contains("inf")){
|
||||
upper_bound = Integer.MAX_VALUE;
|
||||
upper_bound = Double.NEGATIVE_INFINITY;
|
||||
}
|
||||
if (((String) metric_json.get("lower_bound")).toLowerCase().contains("-inf")){
|
||||
lower_bound = -Integer.MAX_VALUE;
|
||||
lower_bound = Double.POSITIVE_INFINITY;
|
||||
}
|
||||
else if (((String) metric_json.get("lower_bound")).toLowerCase().contains("inf")){
|
||||
lower_bound = Integer.MAX_VALUE;
|
||||
lower_bound = Double.POSITIVE_INFINITY;
|
||||
}
|
||||
application_metrics.put(metric_name,new RealtimeMonitoringAttribute(metric_name,lower_bound,upper_bound));
|
||||
}
|
||||
|
@ -53,11 +53,11 @@ public class Main {
|
||||
operational_mode = getSLOViolationDetectionOperationalMode("DIRECTOR");
|
||||
inputStream = getPreferencesFileInputStream(EMPTY);
|
||||
} else if (args.length == 1) {
|
||||
Logger.getAnonymousLogger().log(info_logging_level, "Operational mode has been manually specified");
|
||||
Logger.getGlobal().log(info_logging_level, "Operational mode has been manually specified");
|
||||
operational_mode = getSLOViolationDetectionOperationalMode(args[0]);
|
||||
inputStream = getPreferencesFileInputStream(EMPTY);
|
||||
} else {
|
||||
Logger.getAnonymousLogger().log(info_logging_level, "Operational mode and preferences file has been manually specified");
|
||||
Logger.getGlobal().log(info_logging_level, "Operational mode and preferences file has been manually specified");
|
||||
operational_mode = getSLOViolationDetectionOperationalMode(args[0]);
|
||||
inputStream = getPreferencesFileInputStream(args[1]);
|
||||
|
||||
@ -72,8 +72,8 @@ public class Main {
|
||||
slo_violation_probability_threshold = Double.parseDouble(prop.getProperty("slo_violation_probability_threshold"));
|
||||
slo_violation_determination_method = prop.getProperty("slo_violation_determination_method");
|
||||
maximum_acceptable_forward_predictions = Integer.parseInt(prop.getProperty("maximum_acceptable_forward_predictions"));
|
||||
director_subscription_topics = get_director_subscription_topics();
|
||||
DetectorSubcomponent detector = new DetectorSubcomponent(default_handled_application_name,detached);
|
||||
//director_subscription_topics = get_director_subscription_topics();
|
||||
DetectorSubcomponent detector = new DetectorSubcomponent(default_application_name,detached);
|
||||
detectors.add(detector);
|
||||
ArrayList<String> unbounded_metric_strings = new ArrayList<>(Arrays.asList(prop.getProperty("metrics_bounds").split(",")));
|
||||
for (String metric_string : unbounded_metric_strings) {
|
||||
@ -81,18 +81,18 @@ public class Main {
|
||||
}
|
||||
} //initialization
|
||||
if (operational_mode.equals(OperationalMode.DETECTOR)) {
|
||||
Logger.getAnonymousLogger().log(INFO,"Starting new Detector instance"); //This detector instance has been already started in the initialization block above as it will be commonly needed both for the plain Detector and the Director-Detector
|
||||
Logger.getGlobal().log(INFO,"Starting new Detector instance"); //This detector instance has been already started in the initialization block above as it will be commonly needed both for the plain Detector and the Director-Detector
|
||||
}else if (operational_mode.equals(OperationalMode.DIRECTOR)){
|
||||
Logger.getAnonymousLogger().log(INFO,"Starting new Director and new Detector instance");
|
||||
Logger.getGlobal().log(INFO,"Starting new Director and new Detector instance");
|
||||
DirectorSubcomponent director = new DirectorSubcomponent();
|
||||
SpringApplication.run(Main.class, args);
|
||||
Logger.getAnonymousLogger().log(INFO,"Execution completed");
|
||||
Logger.getGlobal().log(INFO,"Execution completed");
|
||||
}
|
||||
}catch (IOException e){
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Problem reading input file");
|
||||
Logger.getGlobal().log(info_logging_level,"Problem reading input file");
|
||||
e.printStackTrace();
|
||||
}catch (Exception e){
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Miscellaneous issue during startup");
|
||||
Logger.getGlobal().log(info_logging_level,"Miscellaneous issue during startup");
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
@ -91,7 +91,7 @@ public class SLORule {
|
||||
try{
|
||||
Exception e = new Exception("An invalid rule was sent to the SLO Violation detector - ignoring the rule having the following representation\n"+rule_representation.toJSONString());
|
||||
}catch (Exception e){
|
||||
Logger.getAnonymousLogger().log(Level.SEVERE,"An invalid rule was sent to the SLO Violation detector");
|
||||
Logger.getGlobal().log(Level.SEVERE,"An invalid rule was sent to the SLO Violation detector");
|
||||
return rule_format;
|
||||
}
|
||||
}
|
||||
|
@ -1,6 +1,8 @@
|
||||
package slo_violation_detector_engine.detector;
|
||||
|
||||
import org.json.simple.JSONArray;
|
||||
import org.json.simple.JSONObject;
|
||||
import org.json.simple.parser.JSONParser;
|
||||
import slo_violation_detector_engine.generic.Runnables;
|
||||
import slo_violation_detector_engine.generic.SLOViolationDetectorSubcomponent;
|
||||
import utility_beans.*;
|
||||
@ -23,6 +25,7 @@ public class DetectorSubcomponent extends SLOViolationDetectorSubcomponent {
|
||||
private DetectorSubcomponentState subcomponent_state;
|
||||
public final AtomicBoolean stop_signal = new AtomicBoolean(false);
|
||||
public final SynchronizedBoolean can_modify_slo_rules = new SynchronizedBoolean(false);
|
||||
public final SynchronizedBoolean can_modify_monitoring_metrics = new SynchronizedBoolean(false);
|
||||
public SynchronizedBooleanMap HAS_MESSAGE_ARRIVED = new SynchronizedBooleanMap();
|
||||
|
||||
|
||||
@ -62,11 +65,41 @@ public class DetectorSubcomponent extends SLOViolationDetectorSubcomponent {
|
||||
slo_rule_arrived.set(true);
|
||||
can_modify_slo_rules.notifyAll();
|
||||
|
||||
Logger.getAnonymousLogger().log(info_logging_level, "BrokerClientApp: - Received text message: " + message + " at topic " + topic);
|
||||
Logger.getGlobal().log(info_logging_level, "BrokerClientApp: - Received text message: " + message + " at topic " + topic);
|
||||
|
||||
}
|
||||
return topic + ":MSG:" + message;
|
||||
};
|
||||
|
||||
public BiFunction<String, String, String> metric_list_subscriber_function = (topic, message) -> {
|
||||
synchronized (can_modify_monitoring_metrics) {
|
||||
can_modify_monitoring_metrics.setValue(true);
|
||||
MESSAGE_CONTENTS.assign_value(topic, message);
|
||||
//TODO add monitoring metrics bounds
|
||||
String metric_name;
|
||||
double lower_bound,upper_bound;
|
||||
JSONParser parser = new JSONParser();
|
||||
JSONObject metric_list_object;
|
||||
try {
|
||||
metric_list_object = (JSONObject) parser.parse(message);
|
||||
for (Object element : (JSONArray) metric_list_object.get("metric_list")){
|
||||
metric_name = (String)((JSONObject)element).get("name");
|
||||
lower_bound = (Double)((JSONObject)element).get("lower_bound");
|
||||
upper_bound = (Double)((JSONObject)element).get("upper_bound");
|
||||
subcomponent_state.getMonitoring_attributes().put(metric_name,new RealtimeMonitoringAttribute(metric_name,lower_bound,upper_bound));
|
||||
}
|
||||
}catch (Exception e){
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
//slo_rule_arrived.set(true);
|
||||
can_modify_monitoring_metrics.notifyAll();
|
||||
|
||||
Logger.getGlobal().log(info_logging_level, "BrokerClientApp: - Received text message: " + message + " at topic " + topic);
|
||||
|
||||
}
|
||||
return "Monitoring metrics message processed";
|
||||
};
|
||||
public static BrokerSubscriber device_lost_subscriber = new BrokerSubscriber(topic_for_lost_device_announcement, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password"), amq_library_configuration_location);
|
||||
public static BiFunction<String, String, String> device_lost_subscriber_function = (topic, message) -> {
|
||||
BrokerPublisher persistent_publisher = new BrokerPublisher(topic_for_severity_announcement, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password"), amq_library_configuration_location);
|
||||
|
@ -108,7 +108,7 @@ public class DetectorSubcomponentUtilities {
|
||||
Long possible_targeted_adaptation_time = possible_targeted_prediction_times.get(i);
|
||||
if (!detector.getSubcomponent_state().adaptation_times_pending_processing.contains(possible_targeted_adaptation_time)){
|
||||
detector.getSubcomponent_state().adaptation_times.remove(possible_targeted_adaptation_time);
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Removing targeted prediction time "+possible_targeted_adaptation_time+" as it is going to be used");
|
||||
Logger.getGlobal().log(info_logging_level,"Removing targeted prediction time "+possible_targeted_adaptation_time+" as it is going to be used");
|
||||
detector.getSubcomponent_state().adaptation_times_pending_processing.add(possible_targeted_adaptation_time);
|
||||
return possible_targeted_adaptation_time;
|
||||
}
|
||||
@ -127,31 +127,32 @@ public class DetectorSubcomponentUtilities {
|
||||
|
||||
public static boolean slo_rule_arrived_has_updated_version(String rule_representation) {
|
||||
JSONObject json_object = null;
|
||||
long json_object_version = Integer.MAX_VALUE;
|
||||
long json_object_version = 1;
|
||||
try {
|
||||
json_object = (JSONObject) new JSONParser().parse(rule_representation);
|
||||
json_object_version = (Long) json_object.get("version");
|
||||
//json_object_version = (Long) json_object.get("version");
|
||||
json_object_version++;
|
||||
} catch (NullPointerException n){
|
||||
n.printStackTrace();
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Unfortunately a null message was sent to the SLO Violation Detector, which is being ignored");
|
||||
Logger.getGlobal().log(info_logging_level,"Unfortunately a null message was sent to the SLO Violation Detector, which is being ignored");
|
||||
return false;
|
||||
} catch (Exception e){
|
||||
e.printStackTrace();
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Could not parse the JSON of the new SLO, assuming it is not an updated rule...");
|
||||
Logger.getGlobal().log(info_logging_level,"Could not parse the JSON of the new SLO, assuming it is not an updated rule...");
|
||||
return false;
|
||||
}
|
||||
if (json_object_version > current_slo_rules_version){
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"An SLO with updated version ("+json_object_version+" vs older "+current_slo_rules_version+") has arrived");
|
||||
Logger.getGlobal().log(info_logging_level,"An SLO with updated version ("+json_object_version+" vs older "+current_slo_rules_version+") has arrived");
|
||||
current_slo_rules_version=json_object_version;
|
||||
return true;
|
||||
}else {
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Taking no action for the received SLO message as the version number is not updated");
|
||||
Logger.getGlobal().log(info_logging_level,"Taking no action for the received SLO message as the version number is not updated");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
public static void stop_all_running_threads(DetectorSubcomponent associated_detector_subcomponent) {
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Asking previously existing threads to terminate");
|
||||
Logger.getGlobal().log(info_logging_level,"Asking previously existing threads to terminate");
|
||||
int initial_number_of_running_threads = associated_detector_subcomponent.getSubcomponent_state().slo_bound_running_threads.size();
|
||||
while (associated_detector_subcomponent.getSubcomponent_state().slo_bound_running_threads.size()>0) {
|
||||
synchronized (associated_detector_subcomponent.stop_signal) {
|
||||
@ -163,14 +164,14 @@ public class DetectorSubcomponentUtilities {
|
||||
associated_detector_subcomponent.getSubcomponent_state().slo_bound_running_threads.values().forEach(Thread::interrupt);
|
||||
}catch (Exception e){
|
||||
}
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Stopped "+(initial_number_of_running_threads- associated_detector_subcomponent.getSubcomponent_state().slo_bound_running_threads.size())+"/"+initial_number_of_running_threads+" already running threads");
|
||||
Logger.getGlobal().log(info_logging_level,"Stopped "+(initial_number_of_running_threads- associated_detector_subcomponent.getSubcomponent_state().slo_bound_running_threads.size())+"/"+initial_number_of_running_threads+" already running threads");
|
||||
if (associated_detector_subcomponent.getSubcomponent_state().slo_bound_running_threads.size()>1){
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"The threads which are still running are the following: "+ associated_detector_subcomponent.getSubcomponent_state().slo_bound_running_threads);
|
||||
Logger.getGlobal().log(info_logging_level,"The threads which are still running are the following: "+ associated_detector_subcomponent.getSubcomponent_state().slo_bound_running_threads);
|
||||
}else if (associated_detector_subcomponent.getSubcomponent_state().slo_bound_running_threads.size()>0){
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"The thread which is still running is the following: "+ associated_detector_subcomponent.getSubcomponent_state().slo_bound_running_threads);
|
||||
Logger.getGlobal().log(info_logging_level,"The thread which is still running is the following: "+ associated_detector_subcomponent.getSubcomponent_state().slo_bound_running_threads);
|
||||
}
|
||||
}
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"All threads have terminated");
|
||||
Logger.getGlobal().log(info_logging_level,"All threads have terminated");
|
||||
synchronized (associated_detector_subcomponent.stop_signal) {
|
||||
associated_detector_subcomponent.stop_signal.set(false);
|
||||
}
|
||||
@ -212,7 +213,7 @@ public class DetectorSubcomponentUtilities {
|
||||
metric_list.add((String) json_object.get("metric"));
|
||||
}
|
||||
}else{
|
||||
Logger.getAnonymousLogger().log(Level.INFO,"An SLO rule was sent in a format which could not be fully parsed, therefore ignoring this rule. The non-understandable part of the SLO rule is printed below"+"\n"+json_object_string);
|
||||
Logger.getGlobal().log(Level.INFO,"An SLO rule was sent in a format which could not be fully parsed, therefore ignoring this rule. The non-understandable part of the SLO rule is printed below"+"\n"+json_object_string);
|
||||
}
|
||||
}catch (Exception p){
|
||||
p.printStackTrace();
|
||||
@ -240,7 +241,7 @@ public class DetectorSubcomponentUtilities {
|
||||
*/
|
||||
return Math.min(rule_severity/100,100);
|
||||
}else if (slo_violation_determination_method.equals("prconf-delta")){
|
||||
//Logger.getAnonymousLogger().log(warning_logging_level,"The calculation of probability for the prconf-delta method needs to be implemented");
|
||||
//Logger.getGlobal().log(warning_logging_level,"The calculation of probability for the prconf-delta method needs to be implemented");
|
||||
//return 0;
|
||||
if (rule_severity >= 6.52){
|
||||
return Math.min((50+50*(rule_severity-6.52)/93.48)/100,1);
|
||||
@ -249,7 +250,7 @@ public class DetectorSubcomponentUtilities {
|
||||
}
|
||||
|
||||
}else{
|
||||
Logger.getAnonymousLogger().log(warning_logging_level,"Unknown severity calculation method");
|
||||
Logger.getGlobal().log(warning_logging_level,"Unknown severity calculation method");
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
@ -259,20 +260,37 @@ public class DetectorSubcomponentUtilities {
|
||||
while (true) {
|
||||
if (first_run){
|
||||
//Creation of threads that should always run and are independent of the monitored application.
|
||||
//1. Creation of the slo rule input subscriber thread, which listens for new slo rules to be considered
|
||||
//2. Creation of the lost device subscriber thread, which listens for a new event signalling a lost edge device
|
||||
//1. Creation of the metric list input subscriber thread, which listens for the metrics to be considered
|
||||
//2. Creation of the slo rule input subscriber thread, which listens for new slo rules to be considered
|
||||
//3. Creation of the lost device subscriber thread, which listens for a new event signalling a lost edge device
|
||||
|
||||
|
||||
|
||||
BrokerSubscriber slo_rule_topic_subscriber = new BrokerSubscriber(slo_rules_topic, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password"), amq_library_configuration_location);
|
||||
Runnable slo_rules_topic_subscriber_runnable = () -> {
|
||||
//Metric list subscription thread
|
||||
BrokerSubscriber metric_list_subscriber = new BrokerSubscriber(metric_list_topic, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password"), amq_library_configuration_location);
|
||||
Runnable metric_list_topic_subscriber_runnable = () -> {
|
||||
while (true) {
|
||||
slo_rule_topic_subscriber.subscribe(associated_detector_subcomponent.slo_rule_topic_subscriber_function, new AtomicBoolean(false)); //This subscriber should be immune to stop signals
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Broker unavailable, will try to reconnect after 10 seconds");
|
||||
metric_list_subscriber.subscribe(associated_detector_subcomponent.metric_list_subscriber_function, associated_detector_subcomponent.stop_signal); //This subscriber should not be immune to stop signals
|
||||
Logger.getGlobal().log(info_logging_level,"Broker unavailable, will try to reconnect after 10 seconds");
|
||||
try {
|
||||
Thread.sleep(10000);
|
||||
}catch (InterruptedException i){
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Sleep was interrupted, will immediately try to connect to the broker");
|
||||
Logger.getGlobal().log(info_logging_level,"Sleep was interrupted, will immediately try to connect to the broker");
|
||||
}
|
||||
}
|
||||
};
|
||||
CharacterizedThread.create_new_thread(metric_list_topic_subscriber_runnable,"metric_list_topic_subscriber_thread",true,associated_detector_subcomponent);
|
||||
|
||||
|
||||
|
||||
//SLO rule subscription thread
|
||||
BrokerSubscriber slo_rule_topic_subscriber = new BrokerSubscriber(slo_rules_topic, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password"), amq_library_configuration_location);
|
||||
Runnable slo_rules_topic_subscriber_runnable = () -> {
|
||||
while (true) {
|
||||
slo_rule_topic_subscriber.subscribe(associated_detector_subcomponent.slo_rule_topic_subscriber_function, associated_detector_subcomponent.stop_signal); //This subscriber should not be immune to stop signals
|
||||
Logger.getGlobal().log(info_logging_level,"Broker unavailable, will try to reconnect after 10 seconds");
|
||||
try {
|
||||
Thread.sleep(10000);
|
||||
}catch (InterruptedException i){
|
||||
Logger.getGlobal().log(info_logging_level,"Sleep was interrupted, will immediately try to connect to the broker");
|
||||
}
|
||||
}
|
||||
};
|
||||
@ -294,7 +312,7 @@ public class DetectorSubcomponentUtilities {
|
||||
}
|
||||
BrokerPublisher publisher = new BrokerPublisher(slo_rules_topic, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password"), amq_library_configuration_location);
|
||||
publisher.publish(rules_json_string);
|
||||
Logger.getAnonymousLogger().log(info_logging_level, "Sent message\n" + rules_json_string);
|
||||
Logger.getGlobal().log(info_logging_level, "Sent message\n" + rules_json_string);
|
||||
}
|
||||
}
|
||||
first_run = false;
|
||||
|
@ -1,14 +1,17 @@
|
||||
package slo_violation_detector_engine.director;
|
||||
|
||||
import eu.nebulouscloud.exn.Connector;
|
||||
import slo_violation_detector_engine.generic.SLOViolationDetectorSubcomponent;
|
||||
import utility_beans.CharacterizedThread;
|
||||
|
||||
import java.util.HashMap;
|
||||
|
||||
import static utilities.OperationalModeUtils.get_director_publishing_topics;
|
||||
import static utilities.OperationalModeUtils.get_director_subscription_topics;
|
||||
|
||||
public class DirectorSubcomponent extends SLOViolationDetectorSubcomponent {
|
||||
public HashMap<String,Thread> persistent_running_director_threads = new HashMap<>();
|
||||
public Connector subscribing_connector;
|
||||
Integer id = 1;
|
||||
public static HashMap<String,DirectorSubcomponent> director_subcomponents = new HashMap<>();
|
||||
private static DirectorSubcomponent master_director;
|
||||
@ -32,6 +35,11 @@ public class DirectorSubcomponent extends SLOViolationDetectorSubcomponent {
|
||||
private void create_director_topic_subscribers(){
|
||||
for (String subscription_topic : get_director_subscription_topics()){
|
||||
//TODO subscribe to each topic, creating a Characterized thread for each of them
|
||||
|
||||
}
|
||||
for (String publishing_topic : get_director_publishing_topics()){
|
||||
//TODO do the same for publishing topics
|
||||
}
|
||||
//subscribing_connector = new Connector("slovid_director",)
|
||||
}
|
||||
}
|
||||
|
@ -41,12 +41,12 @@ public class Runnables {
|
||||
throw new InterruptedException();
|
||||
}
|
||||
} catch (Exception i) {
|
||||
Logger.getAnonymousLogger().log(info_logging_level, "Possible interruption of debug data subscriber thread for " + debug_data_trigger_topic_name + " - if not stacktrace follows");
|
||||
Logger.getGlobal().log(info_logging_level, "Possible interruption of debug data subscriber thread for " + debug_data_trigger_topic_name + " - if not stacktrace follows");
|
||||
if (!(i instanceof InterruptedException)) {
|
||||
i.printStackTrace();
|
||||
}
|
||||
} finally {
|
||||
Logger.getAnonymousLogger().log(info_logging_level, "Removing debug data subscriber thread for " + debug_data_trigger_topic_name);
|
||||
Logger.getGlobal().log(info_logging_level, "Removing debug data subscriber thread for " + debug_data_trigger_topic_name);
|
||||
detector.getSubcomponent_state().slo_bound_running_threads.remove("debug_data_subscription_thread_" + debug_data_trigger_topic_name);
|
||||
}
|
||||
}
|
||||
@ -54,12 +54,12 @@ public class Runnables {
|
||||
|
||||
public static Runnable device_lost_topic_subscriber_runnable = () -> {
|
||||
while (true) {
|
||||
device_lost_subscriber.subscribe(device_lost_subscriber_function, new AtomicBoolean(false)); //This subscriber should be immune to stop signals
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"A device used by the platform was lost, will therefore trigger a reconfiguration");
|
||||
device_lost_subscriber.subscribe(device_lost_subscriber_function, new AtomicBoolean(false)); //This subscriber should not be immune to stop signals
|
||||
Logger.getGlobal().log(info_logging_level,"A device used by the platform was lost, will therefore trigger a reconfiguration");
|
||||
try {
|
||||
Thread.sleep(10000);
|
||||
}catch (InterruptedException i){
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Sleep was interrupted, will immediately try to connect to the broker");
|
||||
Logger.getGlobal().log(info_logging_level,"Sleep was interrupted, will immediately try to connect to the broker");
|
||||
}
|
||||
}
|
||||
};
|
||||
@ -108,7 +108,7 @@ public class Runnables {
|
||||
try {
|
||||
detector.ADAPTATION_TIMES_MODIFY.wait();
|
||||
} catch (InterruptedException e) {
|
||||
Logger.getAnonymousLogger().log(warning_logging_level, "Interrupted while waiting to access the lock for adaptation times object");
|
||||
Logger.getGlobal().log(warning_logging_level, "Interrupted while waiting to access the lock for adaptation times object");
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
@ -122,7 +122,7 @@ public class Runnables {
|
||||
if (targeted_prediction_time == null) {
|
||||
continue;
|
||||
}
|
||||
Logger.getAnonymousLogger().log(info_logging_level, "Targeted_prediction_time " + targeted_prediction_time);
|
||||
Logger.getGlobal().log(info_logging_level, "Targeted_prediction_time " + targeted_prediction_time);
|
||||
Runnable internal_severity_calculation_runnable = () -> {
|
||||
try {
|
||||
synchronized (detector.PREDICTION_EXISTS) {
|
||||
@ -131,18 +131,18 @@ public class Runnables {
|
||||
|
||||
Long sleep_time = targeted_prediction_time * 1000 - time_horizon_seconds * 1000L - current_time;
|
||||
if (sleep_time <= 0) {
|
||||
Logger.getAnonymousLogger().log(info_logging_level, "Prediction cancelled as targeted prediction time was " + targeted_prediction_time * 1000 + " current time is " + current_time + " and the time_horizon is " + time_horizon_seconds * 1000);
|
||||
Logger.getGlobal().log(info_logging_level, "Prediction cancelled as targeted prediction time was " + targeted_prediction_time * 1000 + " current time is " + current_time + " and the time_horizon is " + time_horizon_seconds * 1000);
|
||||
return; //The predictions are too near to the targeted reconfiguration time (or are even obsolete)
|
||||
} else if (sleep_time > current_time + maximum_acceptable_forward_predictions * time_horizon_seconds * 1000L) {
|
||||
Logger.getAnonymousLogger().log(info_logging_level, "Prediction cancelled as targeted prediction time was " + targeted_prediction_time * 1000 + " and the current time is " + current_time + ". The prediction is more than " + maximum_acceptable_forward_predictions + " time_horizon intervals into the future (the time_horizon is " + time_horizon_seconds * 1000 + " milliseconds)");
|
||||
Logger.getGlobal().log(info_logging_level, "Prediction cancelled as targeted prediction time was " + targeted_prediction_time * 1000 + " and the current time is " + current_time + ". The prediction is more than " + maximum_acceptable_forward_predictions + " time_horizon intervals into the future (the time_horizon is " + time_horizon_seconds * 1000 + " milliseconds)");
|
||||
return; //The predictions are too near to the targeted reconfiguration tim
|
||||
}
|
||||
Logger.getAnonymousLogger().log(info_logging_level, "Sleeping for " + sleep_time + " milliseconds");
|
||||
Logger.getGlobal().log(info_logging_level, "Sleeping for " + sleep_time + " milliseconds");
|
||||
sleep(sleep_time);
|
||||
double rule_severity = process_rule_value(rule, targeted_prediction_time);
|
||||
double slo_violation_probability = determine_slo_violation_probability(rule_severity);
|
||||
Logger.getAnonymousLogger().log(info_logging_level, "The overall " + slo_violation_determination_method + " severity - calculated from real data - for adaptation time " + targeted_prediction_time + " ( " + (new Date((new Timestamp(targeted_prediction_time * 1000)).getTime())) + " ) is " + rule_severity + " and is calculated " + time_horizon_seconds + " seconds beforehand");
|
||||
Logger.getAnonymousLogger().log(info_logging_level, "The probability of an SLO violation is " + ((int) (slo_violation_probability * 100)) + "%" + (slo_violation_probability < slo_violation_probability_threshold ? " so it will not be published" : " and it will be published"));
|
||||
Logger.getGlobal().log(info_logging_level, "The overall " + slo_violation_determination_method + " severity - calculated from real data - for adaptation time " + targeted_prediction_time + " ( " + (new Date((new Timestamp(targeted_prediction_time * 1000)).getTime())) + " ) is " + rule_severity + " and is calculated " + time_horizon_seconds + " seconds beforehand");
|
||||
Logger.getGlobal().log(info_logging_level, "The probability of an SLO violation is " + ((int) (slo_violation_probability * 100)) + "%" + (slo_violation_probability < slo_violation_probability_threshold ? " so it will not be published" : " and it will be published"));
|
||||
|
||||
if (slo_violation_probability >= slo_violation_probability_threshold) {
|
||||
JSONObject severity_json = new JSONObject();
|
||||
@ -167,13 +167,13 @@ public class Runnables {
|
||||
detector.ADAPTATION_TIMES_MODIFY.notifyAll();
|
||||
}
|
||||
} catch (InterruptedException i) {
|
||||
Logger.getAnonymousLogger().log(severe_logging_level, "Severity calculation thread for epoch time " + targeted_prediction_time + " interrupted, stopping...");
|
||||
Logger.getGlobal().log(severe_logging_level, "Severity calculation thread for epoch time " + targeted_prediction_time + " interrupted, stopping...");
|
||||
return;
|
||||
}
|
||||
};
|
||||
CharacterizedThread.create_new_thread(internal_severity_calculation_runnable, "internal_severity_calculation_thread_" + targeted_prediction_time, true,detector);
|
||||
} catch (NoSuchElementException n) {
|
||||
Logger.getAnonymousLogger().log(warning_logging_level, "Could not calculate severity as a value was missing...");
|
||||
Logger.getGlobal().log(warning_logging_level, "Could not calculate severity as a value was missing...");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
@ -26,7 +26,7 @@ public class SLOViolationDetectorStateUtils {
|
||||
base_project_path = new File(EMPTY).toURI();
|
||||
URI absolute_configuration_file_path = new File(configuration_file_location).toURI();
|
||||
URI relative_configuration_file_path = base_project_path.relativize(absolute_configuration_file_path);
|
||||
Logger.getAnonymousLogger().log(info_logging_level, "This is the base project path:" + base_project_path);
|
||||
Logger.getGlobal().log(info_logging_level, "This is the base project path:" + base_project_path);
|
||||
return new FileInputStream(base_project_path.getPath() + relative_configuration_file_path);
|
||||
}else{
|
||||
if (base_project_path == null || base_project_path.getPath().equals(EMPTY)) {
|
||||
|
@ -14,12 +14,18 @@ public class OperationalModeUtils{
|
||||
public static ArrayList<String> get_director_subscription_topics(){
|
||||
return new ArrayList<>
|
||||
(List.of(
|
||||
topic_for_severity_announcement,
|
||||
topic_for_lost_device_announcement,
|
||||
slo_rules_topic,
|
||||
metric_list_topic
|
||||
));
|
||||
}
|
||||
|
||||
public static ArrayList<String> get_director_publishing_topics(){
|
||||
return new ArrayList<>(
|
||||
List.of(
|
||||
topic_for_severity_announcement,
|
||||
topic_for_lost_device_announcement
|
||||
));
|
||||
}
|
||||
public static OperationalMode getSLOViolationDetectionOperationalMode(String operational_mode) {
|
||||
if (operational_mode.equalsIgnoreCase("DIRECTOR")){
|
||||
return OperationalMode.DIRECTOR;
|
||||
@ -27,7 +33,7 @@ public class OperationalModeUtils{
|
||||
return OperationalMode.DETECTOR;
|
||||
}
|
||||
else{
|
||||
Logger.getAnonymousLogger().log(Level.SEVERE,"Creating new SLO Violation Detection instance as a DETECTOR node, however the specification of the type of node whould be DIRECTOR or DETECTOR, not "+operational_mode);
|
||||
Logger.getGlobal().log(Level.SEVERE,"Creating new SLO Violation Detection instance as a DETECTOR node, however the specification of the type of node whould be DIRECTOR or DETECTOR, not "+operational_mode);
|
||||
return OperationalMode.DIRECTOR;
|
||||
}
|
||||
}
|
||||
|
@ -25,18 +25,18 @@ public class SLOViolationCalculator {
|
||||
double severity_sum = get_greater_than_severity_sum(predictionAttribute);
|
||||
all_metrics_method_attribute_severity = Math.sqrt(severity_sum)/Math.sqrt(3);
|
||||
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"The all-metrics attribute severity for a greater-than rule related to attribute " + predictionAttribute.getName() + " based on a (roc,prconf,normalized_interval,delta) quadraplet of (" + predictionAttribute.getRate_of_change_for_greater_than_rule() + "," + predictionAttribute.getProbability_confidence()+ "," + predictionAttribute.getNormalizedConfidenceIntervalWidth()+","+predictionAttribute.getDelta_for_greater_than_rule() + ") is " + all_metrics_method_attribute_severity);
|
||||
Logger.getGlobal().log(info_logging_level,"The all-metrics attribute severity for a greater-than rule related to attribute " + predictionAttribute.getName() + " based on a (roc,prconf,normalized_interval,delta) quadraplet of (" + predictionAttribute.getRate_of_change_for_greater_than_rule() + "," + predictionAttribute.getProbability_confidence()+ "," + predictionAttribute.getNormalizedConfidenceIntervalWidth()+","+predictionAttribute.getDelta_for_greater_than_rule() + ") is " + all_metrics_method_attribute_severity);
|
||||
if (severity_sum<0){
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"The NaN severity value is produced due to the root of a negative severity sum");
|
||||
Logger.getGlobal().log(info_logging_level,"The NaN severity value is produced due to the root of a negative severity sum");
|
||||
}
|
||||
}
|
||||
else if (rule_type.equals(SLOSubRule.RuleType.less_than_rule)){
|
||||
double severity_sum = get_less_than_severity_sum(predictionAttribute);
|
||||
all_metrics_method_attribute_severity = Math.sqrt(severity_sum)/Math.sqrt(3);
|
||||
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"The all-metrics attribute severity for a less-than rule related to attribute " + predictionAttribute.getName() + " based on a (roc,prconf,normalized_interval,delta) quadraplet of (" + predictionAttribute.getRate_of_change_for_less_than_rule() + "," + predictionAttribute.getProbability_confidence()+ "," + predictionAttribute.getNormalizedConfidenceIntervalWidth()+","+predictionAttribute.getDelta_for_less_than_rule() + ") is " + all_metrics_method_attribute_severity);
|
||||
Logger.getGlobal().log(info_logging_level,"The all-metrics attribute severity for a less-than rule related to attribute " + predictionAttribute.getName() + " based on a (roc,prconf,normalized_interval,delta) quadraplet of (" + predictionAttribute.getRate_of_change_for_less_than_rule() + "," + predictionAttribute.getProbability_confidence()+ "," + predictionAttribute.getNormalizedConfidenceIntervalWidth()+","+predictionAttribute.getDelta_for_less_than_rule() + ") is " + all_metrics_method_attribute_severity);
|
||||
if (severity_sum<0){
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"The NaN severity value is produced due to the root of a negative severity sum");
|
||||
Logger.getGlobal().log(info_logging_level,"The NaN severity value is produced due to the root of a negative severity sum");
|
||||
}
|
||||
}
|
||||
else if (rule_type.equals(SLOSubRule.RuleType.equal_rule)){
|
||||
@ -46,25 +46,25 @@ public class SLOViolationCalculator {
|
||||
|
||||
if (less_than_severity_sum>greater_than_severity_sum){
|
||||
all_metrics_method_attribute_severity = Math.sqrt(less_than_severity_sum)/Math.sqrt(3);
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"The all-metrics attribute severity for an 'equals' rule related to attribute " + predictionAttribute.getName() + " based on a (roc,prconf,normalized_interval,delta) quadraplet of (" + predictionAttribute.getRate_of_change_for_less_than_rule() + "," + predictionAttribute.getProbability_confidence()+ "," + predictionAttribute.getNormalizedConfidenceIntervalWidth()+","+predictionAttribute.getDelta_for_less_than_rule() + ") is " + all_metrics_method_attribute_severity);
|
||||
Logger.getGlobal().log(info_logging_level,"The all-metrics attribute severity for an 'equals' rule related to attribute " + predictionAttribute.getName() + " based on a (roc,prconf,normalized_interval,delta) quadraplet of (" + predictionAttribute.getRate_of_change_for_less_than_rule() + "," + predictionAttribute.getProbability_confidence()+ "," + predictionAttribute.getNormalizedConfidenceIntervalWidth()+","+predictionAttribute.getDelta_for_less_than_rule() + ") is " + all_metrics_method_attribute_severity);
|
||||
if (less_than_severity_sum<0){
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"The NaN severity value is produced due to the root of a negative severity sum");
|
||||
Logger.getGlobal().log(info_logging_level,"The NaN severity value is produced due to the root of a negative severity sum");
|
||||
}
|
||||
}else{
|
||||
all_metrics_method_attribute_severity = Math.sqrt(greater_than_severity_sum)/Math.sqrt(3);
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"The all-metrics attribute severity for a greater-than rule related to attribute " + predictionAttribute.getName() + " based on a (roc,prconf,normalized_interval,delta) quadraplet of (" + predictionAttribute.getRate_of_change_for_greater_than_rule() + "," + predictionAttribute.getProbability_confidence()+ "," + predictionAttribute.getNormalizedConfidenceIntervalWidth()+","+predictionAttribute.getDelta_for_greater_than_rule() + ") is " + all_metrics_method_attribute_severity);
|
||||
Logger.getGlobal().log(info_logging_level,"The all-metrics attribute severity for a greater-than rule related to attribute " + predictionAttribute.getName() + " based on a (roc,prconf,normalized_interval,delta) quadraplet of (" + predictionAttribute.getRate_of_change_for_greater_than_rule() + "," + predictionAttribute.getProbability_confidence()+ "," + predictionAttribute.getNormalizedConfidenceIntervalWidth()+","+predictionAttribute.getDelta_for_greater_than_rule() + ") is " + all_metrics_method_attribute_severity);
|
||||
if (greater_than_severity_sum<0){
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"The NaN severity value is produced due to the root of a negative severity sum");
|
||||
Logger.getGlobal().log(info_logging_level,"The NaN severity value is produced due to the root of a negative severity sum");
|
||||
}
|
||||
}
|
||||
|
||||
}else {
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"An unknown type of rule was introduced, therefore setting the severity to -1 to prevent any adaptation");
|
||||
Logger.getGlobal().log(info_logging_level,"An unknown type of rule was introduced, therefore setting the severity to -1 to prevent any adaptation");
|
||||
all_metrics_method_attribute_severity = -1;
|
||||
}
|
||||
|
||||
if (Double.isNaN(all_metrics_method_attribute_severity) || ( all_metrics_method_attribute_severity<0)){
|
||||
Logger.getAnonymousLogger().log(warning_logging_level,"Negative or NaN severity produced: "+all_metrics_method_attribute_severity+" using 0 instead");
|
||||
Logger.getGlobal().log(warning_logging_level,"Negative or NaN severity produced: "+all_metrics_method_attribute_severity+" using 0 instead");
|
||||
return 0;
|
||||
}
|
||||
else {
|
||||
@ -124,24 +124,24 @@ public class SLOViolationCalculator {
|
||||
double severity_sum;
|
||||
if (rule_type.equals(SLOSubRule.RuleType.greater_than_rule)) {
|
||||
severity_sum = (predictionAttribute.getDelta_for_greater_than_rule() * predictionAttribute.getProbability_confidence() * (100 - predictionAttribute.getNormalizedConfidenceIntervalWidth() / 100)) / (100 * 100 * 100); //dividing by 10000 to normalize;
|
||||
Logger.getAnonymousLogger().log(info_logging_level, "The prconf-delta attribute severity for a greater-than rule related to attribute " + predictionAttribute.getName() + " based on a (prconf,delta,confidence_interval) triplet of (" + predictionAttribute.getProbability_confidence() + "," + predictionAttribute.getDelta_for_greater_than_rule() + "," + predictionAttribute.getConfidence_interval_width() + ") is " + severity_sum);
|
||||
Logger.getGlobal().log(info_logging_level, "The prconf-delta attribute severity for a greater-than rule related to attribute " + predictionAttribute.getName() + " based on a (prconf,delta,confidence_interval) triplet of (" + predictionAttribute.getProbability_confidence() + "," + predictionAttribute.getDelta_for_greater_than_rule() + "," + predictionAttribute.getConfidence_interval_width() + ") is " + severity_sum);
|
||||
}else if (rule_type.equals(SLOSubRule.RuleType.less_than_rule)){
|
||||
severity_sum = (predictionAttribute.getDelta_for_less_than_rule() * predictionAttribute.getProbability_confidence() * (100 - predictionAttribute.getNormalizedConfidenceIntervalWidth() / 100)) / (100 * 100 * 100); //dividing by 10000 to normalize;
|
||||
Logger.getAnonymousLogger().log(info_logging_level, "The prconf-delta attribute severity for a less-than rule related to attribute " + predictionAttribute.getName() + " based on a (prconf,delta,confidence_interval) triplet of (" + predictionAttribute.getProbability_confidence() + "," + predictionAttribute.getDelta_for_less_than_rule() + "," + predictionAttribute.getConfidence_interval_width() + ") is " + severity_sum);
|
||||
Logger.getGlobal().log(info_logging_level, "The prconf-delta attribute severity for a less-than rule related to attribute " + predictionAttribute.getName() + " based on a (prconf,delta,confidence_interval) triplet of (" + predictionAttribute.getProbability_confidence() + "," + predictionAttribute.getDelta_for_less_than_rule() + "," + predictionAttribute.getConfidence_interval_width() + ") is " + severity_sum);
|
||||
}else if (rule_type.equals(SLOSubRule.RuleType.equal_rule)){
|
||||
double greater_than_severity_sum = (predictionAttribute.getDelta_for_greater_than_rule() * predictionAttribute.getProbability_confidence() * (100 - predictionAttribute.getNormalizedConfidenceIntervalWidth() / 100)) / (100 * 100 * 100); //dividing by 10000 to normalize;
|
||||
Logger.getAnonymousLogger().log(info_logging_level, "The prconf-delta attribute severity for a greater-than rule related to attribute " + predictionAttribute.getName() + " based on a (prconf,delta,confidence_interval) triplet of (" + predictionAttribute.getProbability_confidence() + "," + predictionAttribute.getDelta_for_greater_than_rule() + "," + predictionAttribute.getConfidence_interval_width() + ") is " + greater_than_severity_sum);
|
||||
Logger.getGlobal().log(info_logging_level, "The prconf-delta attribute severity for a greater-than rule related to attribute " + predictionAttribute.getName() + " based on a (prconf,delta,confidence_interval) triplet of (" + predictionAttribute.getProbability_confidence() + "," + predictionAttribute.getDelta_for_greater_than_rule() + "," + predictionAttribute.getConfidence_interval_width() + ") is " + greater_than_severity_sum);
|
||||
|
||||
|
||||
double less_than_severity_sum = (predictionAttribute.getDelta_for_less_than_rule() * predictionAttribute.getProbability_confidence() * (100 - predictionAttribute.getNormalizedConfidenceIntervalWidth() / 100)) / (100 * 100 * 100); //dividing by 10000 to normalize;
|
||||
Logger.getAnonymousLogger().log(info_logging_level, "The prconf-delta attribute severity for a less-than rule related to attribute " + predictionAttribute.getName() + " based on a (prconf,delta,confidence_interval) triplet of (" + predictionAttribute.getProbability_confidence() + "," + predictionAttribute.getDelta_for_less_than_rule() + "," + predictionAttribute.getConfidence_interval_width() + ") is " + less_than_severity_sum);
|
||||
Logger.getGlobal().log(info_logging_level, "The prconf-delta attribute severity for a less-than rule related to attribute " + predictionAttribute.getName() + " based on a (prconf,delta,confidence_interval) triplet of (" + predictionAttribute.getProbability_confidence() + "," + predictionAttribute.getDelta_for_less_than_rule() + "," + predictionAttribute.getConfidence_interval_width() + ") is " + less_than_severity_sum);
|
||||
|
||||
severity_sum = Math.max(less_than_severity_sum,greater_than_severity_sum);
|
||||
}else{
|
||||
severity_sum = -1;
|
||||
}
|
||||
if (severity_sum<0){
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"A NaN severity value may be produced due to the root of a negative severity sum - returning zero instead for severity sum");
|
||||
Logger.getGlobal().log(info_logging_level,"A NaN severity value may be produced due to the root of a negative severity sum - returning zero instead for severity sum");
|
||||
severity_sum = 0;
|
||||
}
|
||||
|
||||
|
@ -1,11 +1,84 @@
|
||||
package utility_beans;
|
||||
|
||||
public class BrokerPublisher {
|
||||
public BrokerPublisher(String topicForSeverityAnnouncement, String brokerIpUrl, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation) {
|
||||
import eu.nebulouscloud.exn.Connector;
|
||||
import eu.nebulouscloud.exn.core.Publisher;
|
||||
import eu.nebulouscloud.exn.handlers.ConnectorHandler;
|
||||
import eu.nebulouscloud.exn.settings.StaticExnConfig;
|
||||
import org.json.simple.JSONObject;
|
||||
import org.json.simple.parser.JSONParser;
|
||||
import org.json.simple.parser.ParseException;
|
||||
|
||||
import java.lang.reflect.Array;
|
||||
import java.util.*;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import static configuration.Constants.*;
|
||||
import static utilities.DebugDataSubscription.debug_data_output_topic_name;
|
||||
|
||||
public class BrokerPublisher {
|
||||
private static HashMap<String, HashSet<String>> broker_and_topics_to_publish_to = new HashMap<>();
|
||||
private Publisher private_publisher_instance;
|
||||
private String topic;
|
||||
private String broker_ip;
|
||||
public BrokerPublisher(String topic, String brokerIpUrl, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation) {
|
||||
boolean publisher_configuration_changed;
|
||||
if (!broker_and_topics_to_publish_to.containsKey(brokerIpUrl)){
|
||||
HashSet<String> topics_to_publish_to = new HashSet<>();
|
||||
topics_to_publish_to.add(debug_data_output_topic_name);
|
||||
topics_to_publish_to.add(topic_for_severity_announcement);
|
||||
topics_to_publish_to.add(slo_rules_topic);
|
||||
topics_to_publish_to.add(topic);
|
||||
broker_and_topics_to_publish_to.put(brokerIpUrl,new HashSet<>());
|
||||
publisher_configuration_changed = true;
|
||||
}else{
|
||||
if (!broker_and_topics_to_publish_to.get(brokerIpUrl).contains(topic)){
|
||||
broker_and_topics_to_publish_to.get(brokerIpUrl).add(topic);
|
||||
publisher_configuration_changed = true;
|
||||
}
|
||||
else{
|
||||
publisher_configuration_changed = false;
|
||||
}
|
||||
}
|
||||
if (publisher_configuration_changed){
|
||||
for (String broker_ip : broker_and_topics_to_publish_to.keySet()){
|
||||
ArrayList<Publisher> publishers = new ArrayList<>();
|
||||
for (String broker_topic : broker_and_topics_to_publish_to.get(broker_ip)){
|
||||
Publisher current_publisher = new Publisher(slovid_publisher_key,broker_topic,true);
|
||||
publishers.add(current_publisher);
|
||||
if (broker_ip.equals(brokerIpUrl) && broker_topic.equals(topic)){
|
||||
this.private_publisher_instance = current_publisher;
|
||||
this.topic = broker_topic;
|
||||
this.broker_ip = broker_ip;
|
||||
}
|
||||
}
|
||||
Connector connector = new Connector("slovid",
|
||||
new ConnectorHandler() {
|
||||
}, publishers
|
||||
, List.of(),
|
||||
false,
|
||||
false,
|
||||
new StaticExnConfig(
|
||||
broker_ip,
|
||||
5672,
|
||||
brokerUsername,
|
||||
brokerPassword
|
||||
)
|
||||
);
|
||||
connector.start();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void publish(String jsonString) {
|
||||
|
||||
//TODO This assumes that the only content to be sent is json-like
|
||||
public void publish(String json_string_content) {
|
||||
JSONParser parser = new JSONParser();
|
||||
JSONObject json_object = new JSONObject();
|
||||
try{
|
||||
json_object = (JSONObject) parser.parse(json_string_content);
|
||||
}catch (ParseException p){
|
||||
Logger.getGlobal().log(Level.SEVERE,"Could not parse the string content");
|
||||
}
|
||||
private_publisher_instance.send(json_object);
|
||||
}
|
||||
}
|
||||
|
@ -1,14 +1,149 @@
|
||||
package utility_beans;
|
||||
|
||||
import eu.nebulouscloud.exn.Connector;
|
||||
import eu.nebulouscloud.exn.core.Consumer;
|
||||
import eu.nebulouscloud.exn.core.Context;
|
||||
import eu.nebulouscloud.exn.core.Handler;
|
||||
import eu.nebulouscloud.exn.handlers.ConnectorHandler;
|
||||
import eu.nebulouscloud.exn.settings.StaticExnConfig;
|
||||
import org.apache.qpid.protonj2.client.Message;
|
||||
import org.json.simple.JSONValue;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import static configuration.Constants.slovid_publisher_key;
|
||||
|
||||
public class BrokerSubscriber {
|
||||
public BrokerSubscriber(String testTopic, String brokerIpUrl, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation) {
|
||||
|
||||
private static class MessageProcessingHandler extends Handler{
|
||||
private static final BiFunction temporary_function = (Object o, Object o2) -> {
|
||||
//System.out.println("");
|
||||
Logger.getGlobal().log(Level.INFO,"REPLACE_TEMPORARY_HANDLING_FUNCTIONALITY");
|
||||
return "IN_PROCESSING";
|
||||
};
|
||||
private BiFunction<String,String,String> processing_function;
|
||||
@Override
|
||||
public void onMessage(String key, String address, Map body, Message message, Context context) {
|
||||
processing_function.apply(address, JSONValue.toJSONString(body));
|
||||
}
|
||||
public MessageProcessingHandler(){
|
||||
this.processing_function = temporary_function;
|
||||
}
|
||||
public MessageProcessingHandler(BiFunction biFunction){
|
||||
this.processing_function = biFunction;
|
||||
}
|
||||
public BiFunction getProcessing_function() {
|
||||
return processing_function;
|
||||
}
|
||||
public void setProcessing_function(BiFunction processing_function) {
|
||||
this.processing_function = processing_function;
|
||||
}
|
||||
}
|
||||
private static HashMap<String, HashSet<String>> broker_and_topics_to_subscribe_to = new HashMap<>();
|
||||
private static HashMap<String,HashMap<String,Consumer>> active_consumers_per_topic_per_broker_ip = new HashMap<>();
|
||||
private static HashMap<String,ExtendedConnector> current_connectors = new HashMap<>();
|
||||
private String topic;
|
||||
private String broker_ip;
|
||||
private String brokerUsername;
|
||||
private String brokerPassword;
|
||||
public BrokerSubscriber(String topic, String broker_ip, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation) {
|
||||
boolean subscriber_configuration_changed;
|
||||
if (!broker_and_topics_to_subscribe_to.containsKey(broker_ip)){
|
||||
HashSet<String> topics_to_subscribe_to = new HashSet<>();
|
||||
//topics_to_subscribe_to.add(realtime_metric_topic_name);
|
||||
//topics_to_subscribe_to.add(forecasted_metric_topic_name);
|
||||
//topics_to_subscribe_to.add();
|
||||
topics_to_subscribe_to.add(topic);
|
||||
broker_and_topics_to_subscribe_to.put(broker_ip,new HashSet<>());
|
||||
active_consumers_per_topic_per_broker_ip.put(broker_ip,new HashMap<>());
|
||||
broker_and_topics_to_subscribe_to.get(broker_ip).add(topic);
|
||||
|
||||
subscriber_configuration_changed = true;
|
||||
}else{
|
||||
if (!broker_and_topics_to_subscribe_to.get(broker_ip).contains(topic)){
|
||||
broker_and_topics_to_subscribe_to.get(broker_ip).add(topic);
|
||||
subscriber_configuration_changed = true;
|
||||
}
|
||||
else{
|
||||
subscriber_configuration_changed = false;
|
||||
}
|
||||
}
|
||||
if (subscriber_configuration_changed){
|
||||
Consumer current_consumer = new Consumer(topic, topic, new MessageProcessingHandler());
|
||||
active_consumers_per_topic_per_broker_ip.get(broker_ip).put(topic,current_consumer);
|
||||
|
||||
this.topic = topic;
|
||||
this.broker_ip = broker_ip;
|
||||
this.brokerUsername = brokerUsername;
|
||||
this.brokerPassword = brokerPassword;
|
||||
add_topic_consumer_to_broker_connector(current_consumer);
|
||||
}
|
||||
}
|
||||
|
||||
public void subscribe(BiFunction<String, String, String> function, AtomicBoolean atomicBoolean) {
|
||||
/**
|
||||
* This method updates the global connector of SLOViD to the AMQP server, by adding support for one more component
|
||||
*/
|
||||
private void add_topic_consumer_to_broker_connector(Consumer new_consumer) {
|
||||
if (current_connectors.get(broker_ip)!=null) {
|
||||
current_connectors.get(broker_ip).add_consumer(new_consumer);
|
||||
}else {
|
||||
ArrayList<Consumer> consumers = new ArrayList<>();
|
||||
consumers.add(new_consumer);
|
||||
ExtendedConnector extended_connector = new ExtendedConnector("slovid",
|
||||
new CustomConnectorHandler() {
|
||||
},
|
||||
List.of(),
|
||||
consumers,
|
||||
false,
|
||||
false,
|
||||
new StaticExnConfig(
|
||||
broker_ip,
|
||||
5672,
|
||||
brokerUsername,
|
||||
brokerPassword
|
||||
)
|
||||
);
|
||||
extended_connector.start();
|
||||
current_connectors.put(broker_ip, extended_connector);
|
||||
}
|
||||
}
|
||||
|
||||
private void remove_topic_from_broker_connector (String topic_key){
|
||||
if (current_connectors.get(broker_ip)!=null){
|
||||
current_connectors.get(broker_ip).remove_consumer_with_key(topic_key);
|
||||
}
|
||||
}
|
||||
|
||||
public void subscribe(BiFunction<String, String, String> function, AtomicBoolean stop_signal) {
|
||||
Logger.getGlobal().log(Level.INFO,"ESTABLISHING SUBSCRIPTION");
|
||||
//First remove any leftover consumer
|
||||
active_consumers_per_topic_per_broker_ip.get(broker_ip).remove(topic);
|
||||
remove_topic_from_broker_connector(topic);
|
||||
//Then add the new consumer
|
||||
Consumer new_consumer = new Consumer(topic,topic,new MessageProcessingHandler(function));
|
||||
new_consumer.setProperty("topic",topic);
|
||||
active_consumers_per_topic_per_broker_ip.get(broker_ip).put(topic,new_consumer);
|
||||
add_topic_consumer_to_broker_connector(new_consumer);
|
||||
|
||||
Logger.getGlobal().log(Level.INFO,"ESTABLISHED SUBSCRIPTION to topic "+topic);
|
||||
synchronized (stop_signal){
|
||||
while (!stop_signal.get()){
|
||||
try{
|
||||
stop_signal.wait();
|
||||
}catch (Exception e){
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
Logger.getGlobal().log(Level.INFO,"Stopping subscription for broker "+broker_ip+" and topic "+topic);
|
||||
stop_signal.set(false);
|
||||
}
|
||||
active_consumers_per_topic_per_broker_ip.get(broker_ip).remove(topic);
|
||||
remove_topic_from_broker_connector(topic);
|
||||
}
|
||||
|
||||
public enum EventFields{
|
||||
@ -16,6 +151,8 @@ public class BrokerSubscriber {
|
||||
|
||||
public enum PredictionMetricEventFields {timestamp, prediction_time, probability, metric_value, confidence_interval}
|
||||
}
|
||||
|
||||
|
||||
public static class TopicNames{
|
||||
public static String realtime_metric_values_topic(String metric) {
|
||||
return null;
|
||||
|
@ -26,14 +26,14 @@ public class CharacterizedThread{
|
||||
((DetectorSubcomponent)subcomponent).getSubcomponent_state().slo_bound_running_threads.put(thread_name, thread);
|
||||
}catch (NullPointerException n){
|
||||
n.printStackTrace();
|
||||
Logger.getAnonymousLogger().log(Level.SEVERE,"Although the thread type for thread "+thread_name+" was declared to be an slo_bound_running_thread, no detector subcomponent was related to it");
|
||||
Logger.getGlobal().log(Level.SEVERE,"Although the thread type for thread "+thread_name+" was declared to be an slo_bound_running_thread, no detector subcomponent was related to it");
|
||||
}
|
||||
}else if (subcomponent.thread_type.equals(persistent_running_director_thread)){
|
||||
((DirectorSubcomponent) subcomponent).persistent_running_director_threads.put(thread_name,thread);
|
||||
}else if (subcomponent.thread_type.equals(persistent_running_detector_thread)){
|
||||
((DetectorSubcomponent)subcomponent).getSubcomponent_state().persistent_running_detector_threads.put(thread_name, thread);
|
||||
}else{
|
||||
Logger.getAnonymousLogger().log(Level.WARNING,"Undefined type of thread for thread with name: "+thread_name);
|
||||
Logger.getGlobal().log(Level.WARNING,"Undefined type of thread for thread with name: "+thread_name);
|
||||
}
|
||||
if (start_thread_now) {
|
||||
thread.start();
|
||||
|
@ -0,0 +1,27 @@
|
||||
package utility_beans;
|
||||
|
||||
import eu.nebulouscloud.exn.core.Consumer;
|
||||
import eu.nebulouscloud.exn.core.Context;
|
||||
import eu.nebulouscloud.exn.handlers.ConnectorHandler;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class CustomConnectorHandler extends ConnectorHandler {
|
||||
private Context context;
|
||||
|
||||
@Override
|
||||
public void onReady(Context context) {
|
||||
this.context = context;
|
||||
}
|
||||
public void remove_consumer_with_key(String key){
|
||||
context.unregisterConsumer(key);
|
||||
}
|
||||
|
||||
public Context getContext() {
|
||||
return context;
|
||||
}
|
||||
|
||||
public void setContext(Context context) {
|
||||
this.context = context;
|
||||
}
|
||||
}
|
@ -0,0 +1,109 @@
|
||||
package utility_beans;
|
||||
|
||||
import javax.swing.*;
|
||||
import eu.nebulouscloud.exn.Connector;
|
||||
import eu.nebulouscloud.exn.core.Publisher;
|
||||
import eu.nebulouscloud.exn.handlers.ConnectorHandler;
|
||||
import eu.nebulouscloud.exn.settings.StaticExnConfig;
|
||||
import org.json.simple.JSONObject;
|
||||
import org.json.simple.parser.JSONParser;
|
||||
import org.json.simple.parser.ParseException;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import static configuration.Constants.*;
|
||||
|
||||
public class CustomDataPublisher {
|
||||
private static HashMap<String, HashSet<String>> broker_and_topics_to_publish_to = new HashMap<>();
|
||||
private Publisher private_publisher_instance;
|
||||
private String topic;
|
||||
private String broker_ip;
|
||||
public CustomDataPublisher(String broker_topic, String broker_ip, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation,String publisher_key) {
|
||||
|
||||
boolean publisher_configuration_changed;
|
||||
ArrayList<Publisher> publishers = new ArrayList<>();
|
||||
private_publisher_instance = new Publisher(slovid_publisher_key,broker_topic,true,true);
|
||||
publishers.add(private_publisher_instance);
|
||||
|
||||
|
||||
Connector connector = new Connector("slovid",
|
||||
new ConnectorHandler() {
|
||||
}, publishers
|
||||
, List.of(),
|
||||
false,
|
||||
false,
|
||||
new StaticExnConfig(
|
||||
broker_ip,
|
||||
5672,
|
||||
brokerUsername,
|
||||
brokerPassword
|
||||
)
|
||||
);
|
||||
connector.start();
|
||||
|
||||
}
|
||||
|
||||
|
||||
public CustomDataPublisher(String broker_topic, String broker_ip, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation) {
|
||||
this(broker_topic,broker_ip,brokerUsername,brokerPassword,amqLibraryConfigurationLocation,slovid_publisher_key);
|
||||
}
|
||||
public static void main(String[] args){
|
||||
|
||||
//JSONObject msg = new JSONObject();
|
||||
//msg.put("key","value");
|
||||
|
||||
JFrame frame = new JFrame("Broker input app");
|
||||
JTextField smallTextField = new JTextField("eu.nebulouscloud.monitoring.metric_list",30);
|
||||
JTextField othersmallTextField = new JTextField("slovid",20);
|
||||
JTextArea largeTextArea = new JTextArea(10, 30);
|
||||
JButton submitButton = new JButton("Send");
|
||||
|
||||
AtomicReference<String> broker_topic = new AtomicReference<>();
|
||||
AtomicReference<String> message_payload = new AtomicReference<>();
|
||||
AtomicReference<String> publisher_key = new AtomicReference<>();
|
||||
|
||||
|
||||
submitButton.addActionListener(e -> {
|
||||
broker_topic.set(smallTextField.getText());
|
||||
message_payload.set(largeTextArea.getText());
|
||||
publisher_key.set(othersmallTextField.getText());
|
||||
CustomDataPublisher publisher = new CustomDataPublisher(broker_topic.toString(),"localhost","admin","admin",EMPTY,publisher_key.toString());
|
||||
publisher.publish(message_payload.toString());
|
||||
});
|
||||
|
||||
JPanel panel = new JPanel();
|
||||
panel.add(new JLabel("Topic to publish to:"));
|
||||
panel.add(smallTextField);
|
||||
panel.add(new JLabel("Key to publish with:"));
|
||||
panel.add(othersmallTextField);
|
||||
panel.add(new JLabel("Text to publish:"));
|
||||
panel.add(new JScrollPane(largeTextArea));
|
||||
panel.add(submitButton);
|
||||
|
||||
frame.add(panel);
|
||||
frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
|
||||
frame.pack();
|
||||
frame.setVisible(true);
|
||||
|
||||
//publisher.publish(msg);
|
||||
}
|
||||
|
||||
//TODO This assumes that the only content to be sent is json-like
|
||||
public void publish(String json_string_content) {
|
||||
JSONParser parser = new JSONParser();
|
||||
JSONObject json_object = new JSONObject();
|
||||
try{
|
||||
json_object = (JSONObject) parser.parse(json_string_content);
|
||||
}catch (ParseException p){
|
||||
Logger.getGlobal().log(Level.SEVERE,"Could not parse the string content");
|
||||
}
|
||||
private_publisher_instance.send(json_object);
|
||||
}
|
||||
|
||||
public void publish(JSONObject json_object) {
|
||||
private_publisher_instance.send(json_object);
|
||||
}
|
||||
}
|
@ -0,0 +1,65 @@
|
||||
package utility_beans;
|
||||
|
||||
import eu.nebulouscloud.exn.Connector;
|
||||
import eu.nebulouscloud.exn.core.Consumer;
|
||||
import eu.nebulouscloud.exn.core.Handler;
|
||||
import eu.nebulouscloud.exn.core.Publisher;
|
||||
import eu.nebulouscloud.exn.handlers.ConnectorHandler;
|
||||
import eu.nebulouscloud.exn.settings.ExnConfig;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
public class ExtendedConnector extends Connector {
|
||||
private ConnectorHandler handler;
|
||||
|
||||
private Connector connector;
|
||||
public ExtendedConnector(String component, ConnectorHandler handler, List<Publisher> publishers, List<Consumer> consumers, boolean enableState, boolean enableHealth, ExnConfig configuration) {
|
||||
super(component, handler, publishers, consumers, enableState, enableHealth, configuration);
|
||||
this.handler = handler;
|
||||
}
|
||||
|
||||
public ExtendedConnector(String component, ConnectorHandler handler, List<Publisher> publishers, List<Consumer> consumers, boolean enableState, ExnConfig configuration) {
|
||||
super(component, handler, publishers, consumers, enableState, configuration);
|
||||
this.handler = handler;
|
||||
}
|
||||
|
||||
public ExtendedConnector(String component, ConnectorHandler handler, List<Publisher> publishers, List<Consumer> consumers, ExnConfig configuration) {
|
||||
super(component, handler, publishers, consumers, configuration);
|
||||
this.handler = handler;
|
||||
}
|
||||
|
||||
public ConnectorHandler getHandler() {
|
||||
return handler;
|
||||
}
|
||||
|
||||
public void setHandler(ConnectorHandler handler) {
|
||||
this.handler = handler;
|
||||
}
|
||||
|
||||
public void remove_consumer_with_key(String key) {
|
||||
try {
|
||||
((CustomConnectorHandler)handler).getContext().unregisterConsumer(key);
|
||||
}catch (ClassCastException c){
|
||||
Logger.getAnonymousLogger().log(Level.WARNING,"Could not unregister consumer, as the handler of the Connector it belongs to is not a CustomConnectorHandler");
|
||||
}
|
||||
}
|
||||
|
||||
public void add_consumer(Consumer newConsumer) {
|
||||
|
||||
try {
|
||||
((CustomConnectorHandler)handler).getContext().registerConsumer(newConsumer);
|
||||
}catch (ClassCastException c){
|
||||
Logger.getAnonymousLogger().log(Level.WARNING,"Could not register consumer, as the handler of the Connector it belongs to is not a CustomConnectorHandler");
|
||||
}
|
||||
}
|
||||
|
||||
public Connector getConnector() {
|
||||
return connector;
|
||||
}
|
||||
|
||||
public void setConnector(Connector connector) {
|
||||
this.connector = connector;
|
||||
}
|
||||
}
|
@ -51,7 +51,7 @@ public class PredictedMonitoringAttribute {
|
||||
this.threshold = threshold;
|
||||
double current_value = RealtimeMonitoringAttribute.get_metric_value(detector,name);
|
||||
if (Double.isNaN(current_value)){
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Detected NaN value for metric "+name+". Thus we cannot compute severity although a predicted value of "+forecasted_value+" has arrived");
|
||||
Logger.getGlobal().log(info_logging_level,"Detected NaN value for metric "+name+". Thus we cannot compute severity although a predicted value of "+forecasted_value+" has arrived");
|
||||
this.initialized = false;
|
||||
return;
|
||||
}
|
||||
@ -143,13 +143,13 @@ public class PredictedMonitoringAttribute {
|
||||
}
|
||||
}
|
||||
else{
|
||||
Logger.getAnonymousLogger().log(severe_logging_level,"Effectively disabling rate of change (ROC) metric, setting it to 0, as an invalid roc_calculation_mode has been chosen");
|
||||
Logger.getGlobal().log(severe_logging_level,"Effectively disabling rate of change (ROC) metric, setting it to 0, as an invalid roc_calculation_mode has been chosen");
|
||||
rate_of_change = 0;
|
||||
normalized_rate_of_change = 0;
|
||||
}
|
||||
String debug_rate_of_change_string = "The rate of change for metric "+name+", having a forecasted value of "+forecasted_value+", previous real value of "+actual_value + ", maximum rate of change equal to "+maximum_rate_of_change+", minimum rate of change equal to "+minimum_rate_of_change+", is "+(int)(rate_of_change*10000)/100.0+"% and the normalized rate of change is "+(int)(normalized_rate_of_change*100)/100.0 +"%";
|
||||
if(!debug_logging_level.equals(Level.OFF)) {
|
||||
Logger.getAnonymousLogger().log(debug_logging_level, debug_rate_of_change_string);
|
||||
Logger.getGlobal().log(debug_logging_level, debug_rate_of_change_string);
|
||||
}
|
||||
|
||||
//Streaming percentile calculation, using non-normalized rate of change
|
||||
@ -168,11 +168,11 @@ public class PredictedMonitoringAttribute {
|
||||
attributes_minimum_rate_of_change.put(name,Math.max(detector.getSubcomponent_state().getMonitoring_attributes_roc_statistics().get(name).getLower_bound(),-roc_limit));
|
||||
|
||||
if (Double.isNaN(detector.getSubcomponent_state().getMonitoring_attributes_roc_statistics().get(name).getUpper_bound())){
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"NaN value detected for maximum rate of change. The individual metric values are "+detector.getSubcomponent_state().getMonitoring_attributes_roc_statistics().get(name).toString());
|
||||
Logger.getGlobal().log(info_logging_level,"NaN value detected for maximum rate of change. The individual metric values are "+detector.getSubcomponent_state().getMonitoring_attributes_roc_statistics().get(name).toString());
|
||||
}
|
||||
|
||||
if (Double.isNaN(detector.getSubcomponent_state().getMonitoring_attributes_roc_statistics().get(name).getLower_bound())){
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"NaN value detected for minimum rate of change. The individual metric values are "+detector.getSubcomponent_state().getMonitoring_attributes_roc_statistics().get(name).toString());
|
||||
Logger.getGlobal().log(info_logging_level,"NaN value detected for minimum rate of change. The individual metric values are "+detector.getSubcomponent_state().getMonitoring_attributes_roc_statistics().get(name).toString());
|
||||
}
|
||||
|
||||
return Math.max(Math.min(normalized_rate_of_change,100.0),-100.0);
|
||||
@ -185,7 +185,7 @@ public class PredictedMonitoringAttribute {
|
||||
double minimum_metric_value = detector.getSubcomponent_state().getMonitoring_attributes_statistics().get(name).getLower_bound();
|
||||
|
||||
if (Double.isInfinite(this.confidence_interval_width)){
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Since the confidence interval is deemed to be infinite, it will be set to 100 and the relevant probability confidence factor should be reduced to the lowest value");
|
||||
Logger.getGlobal().log(info_logging_level,"Since the confidence interval is deemed to be infinite, it will be set to 100 and the relevant probability confidence factor should be reduced to the lowest value");
|
||||
return 100;
|
||||
}
|
||||
if (isZero(maximum_metric_value-minimum_metric_value)){
|
||||
@ -195,7 +195,7 @@ public class PredictedMonitoringAttribute {
|
||||
double normalized_interval_sign = normalized_interval/Math.abs(normalized_interval);
|
||||
if (Math.abs(normalized_interval)>100){
|
||||
normalized_interval = 100*normalized_interval_sign;
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Due to the maximum and minimum metric values being estimated as "+maximum_metric_value+ " and "+minimum_metric_value+" respectively, and as the value of the confidence interval width is "+this.confidence_interval_width+" the absolute value of the normalized interval is limited to a value of "+normalized_interval);
|
||||
Logger.getGlobal().log(info_logging_level,"Due to the maximum and minimum metric values being estimated as "+maximum_metric_value+ " and "+minimum_metric_value+" respectively, and as the value of the confidence interval width is "+this.confidence_interval_width+" the absolute value of the normalized interval is limited to a value of "+normalized_interval);
|
||||
}
|
||||
}
|
||||
return normalized_interval;
|
||||
|
@ -22,11 +22,11 @@ import static utility_beans.PredictedMonitoringAttribute.*;
|
||||
public class RealtimeMonitoringAttribute {
|
||||
|
||||
protected String name;
|
||||
private Integer upper_bound;
|
||||
private Integer lower_bound;
|
||||
private Double upper_bound;
|
||||
private Double lower_bound;
|
||||
private CircularFifoQueue<Double> actual_metric_values = new CircularFifoQueue<>(kept_values_per_metric); //the previous actual values of the metric
|
||||
|
||||
public RealtimeMonitoringAttribute(String name, Integer lower_bound, Integer upper_bound){
|
||||
public RealtimeMonitoringAttribute(String name, Double lower_bound, Double upper_bound){
|
||||
this.name = name;
|
||||
this.lower_bound = lower_bound;
|
||||
this.upper_bound = upper_bound;
|
||||
@ -34,28 +34,28 @@ public class RealtimeMonitoringAttribute {
|
||||
|
||||
public RealtimeMonitoringAttribute(String name, Collection<Double> values){
|
||||
this.name = name;
|
||||
this.lower_bound = 0;
|
||||
this.upper_bound = 100;
|
||||
this.lower_bound = 0.0;
|
||||
this.upper_bound = 100.0;
|
||||
//Equivalent to below: values.stream().forEach(x -> actual_metric_values.add(x));
|
||||
actual_metric_values.addAll(values);
|
||||
}
|
||||
public RealtimeMonitoringAttribute(String name, Double value){
|
||||
this.name = name;
|
||||
this.lower_bound = 0;
|
||||
this.upper_bound = 100;
|
||||
this.lower_bound = 0.0;
|
||||
this.upper_bound = 100.0;
|
||||
actual_metric_values.add(value);
|
||||
}
|
||||
|
||||
public RealtimeMonitoringAttribute(String name){
|
||||
this.name = name;
|
||||
this.lower_bound = 0;
|
||||
this.upper_bound = 100;
|
||||
this.lower_bound = 0.0;
|
||||
this.upper_bound = 100.0;
|
||||
}
|
||||
|
||||
public static Double get_metric_value(DetectorSubcomponent detector, String metric_name){
|
||||
CircularFifoQueue<Double> actual_metric_values = detector.getSubcomponent_state().getMonitoring_attributes().get(metric_name).getActual_metric_values();
|
||||
if (actual_metric_values.size()==0){
|
||||
Logger.getAnonymousLogger().log(warning_logging_level,"Trying to retrieve realtime values from an empty queue for metric "+metric_name);
|
||||
Logger.getGlobal().log(warning_logging_level,"Trying to retrieve realtime values from an empty queue for metric "+metric_name);
|
||||
}
|
||||
return aggregate_metric_values(actual_metric_values);
|
||||
}
|
||||
@ -130,19 +130,19 @@ public class RealtimeMonitoringAttribute {
|
||||
this.actual_metric_values = actual_metric_values;
|
||||
}
|
||||
|
||||
public Integer getUpper_bound() {
|
||||
public Double getUpper_bound() {
|
||||
return upper_bound;
|
||||
}
|
||||
|
||||
public void setUpper_bound(Integer upper_bound) {
|
||||
public void setUpper_bound(Double upper_bound) {
|
||||
this.upper_bound = upper_bound;
|
||||
}
|
||||
|
||||
public Integer getLower_bound() {
|
||||
public Double getLower_bound() {
|
||||
return lower_bound;
|
||||
}
|
||||
|
||||
public void setLower_bound(Integer lower_bound) {
|
||||
public void setLower_bound(Double lower_bound) {
|
||||
this.lower_bound = lower_bound;
|
||||
}
|
||||
}
|
||||
|
@ -2,11 +2,12 @@ self_publish_rule_file = false
|
||||
|
||||
metrics_bounds = avgResponseTime;unbounded;unbounded,custom2;0;3
|
||||
|
||||
slo_rules_topic = metric.metric_list
|
||||
slo_rules_topic = eu.nebulouscloud.monitoring.slo.new
|
||||
single_slo_rule_active = true
|
||||
broker_ip_url = tcp://localhost:61616?wireFormat.maxInactivityDuration=0
|
||||
broker_username = morphemic
|
||||
broker_password = morphemic
|
||||
#broker_ip_url = tcp://localhost:61616?wireFormat.maxInactivityDuration=0
|
||||
broker_ip_url = localhost
|
||||
broker_username = admin
|
||||
broker_password = admin
|
||||
|
||||
slo_violation_probability_threshold = 0.1
|
||||
slo_violation_determination_method = prconf-delta
|
||||
|
@ -57,7 +57,7 @@ public class ConnectivityTests {
|
||||
Double ram_slo_limit = 60.0;
|
||||
Boolean return_value = false;
|
||||
try {
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Received " + message);
|
||||
Logger.getGlobal().log(info_logging_level,"Received " + message);
|
||||
JSONObject rules_json = (JSONObject) new JSONParser().parse(message);
|
||||
Double ram_value = Double.parseDouble(rules_json.get("ram").toString());
|
||||
Double cpu_value = Double.parseDouble(rules_json.get("cpu").toString());
|
||||
|
@ -14,7 +14,7 @@ import utility_beans.PredictedMonitoringAttribute;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import static configuration.Constants.default_handled_application_name;
|
||||
import static configuration.Constants.default_application_name;
|
||||
import static configuration.Constants.roc_limit;
|
||||
import static utility_beans.PredictedMonitoringAttribute.getAttributes_maximum_rate_of_change;
|
||||
import static utility_beans.PredictedMonitoringAttribute.getAttributes_minimum_rate_of_change;
|
||||
@ -23,7 +23,7 @@ import static utility_beans.RealtimeMonitoringAttribute.update_monitoring_attrib
|
||||
|
||||
public class DerivedMonitoringAttributeTests {
|
||||
|
||||
DetectorSubcomponent detector = new DetectorSubcomponent(default_handled_application_name,CharacterizedThread.CharacterizedThreadRunMode.detached);
|
||||
DetectorSubcomponent detector = new DetectorSubcomponent(default_application_name,CharacterizedThread.CharacterizedThreadRunMode.detached);
|
||||
@Test
|
||||
public void roc_calculation_test(){
|
||||
|
||||
|
@ -17,11 +17,11 @@ import utility_beans.PredictedMonitoringAttribute;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
||||
import static configuration.Constants.default_handled_application_name;
|
||||
import static configuration.Constants.default_application_name;
|
||||
|
||||
|
||||
public class SeverityTests {
|
||||
DetectorSubcomponent detector = new DetectorSubcomponent(default_handled_application_name, CharacterizedThread.CharacterizedThreadRunMode.detached);
|
||||
DetectorSubcomponent detector = new DetectorSubcomponent(default_application_name, CharacterizedThread.CharacterizedThreadRunMode.detached);
|
||||
@Test
|
||||
public void all_metrics_Severity_test_1(){
|
||||
|
||||
|
@ -69,7 +69,7 @@ public class UnboundedMonitoringAttributeTests {
|
||||
|
||||
//private String metric_1_name = "custom_metric_1";
|
||||
private static final Long targeted_prediction_time = 100000000000L;
|
||||
private final DetectorSubcomponent detector = new DetectorSubcomponent(default_handled_application_name,detached);
|
||||
private final DetectorSubcomponent detector = new DetectorSubcomponent(default_application_name,detached);
|
||||
@Test
|
||||
public void unbounded_monitoring_attribute_test_1() throws IOException, ParseException {
|
||||
unbounded_monitoring_attribute_test_core("src/main/resources/test_v3_custom_metric_1_simple.json","custom_metric_1",new Double[]{20.0,35.0},new Double[]{110.0,130.0},0.0,50,100, 90,10,0.80);
|
||||
@ -116,14 +116,14 @@ public class UnboundedMonitoringAttributeTests {
|
||||
MonitoringAttributeUtilities.initialize_values(metric_name, detector.getSubcomponent_state());
|
||||
|
||||
String realtime_metric_topic_name = TopicNames.realtime_metric_values_topic(metric_name);
|
||||
Logger.getAnonymousLogger().log(Level.INFO, "Starting realtime subscription at " + realtime_metric_topic_name);
|
||||
Logger.getGlobal().log(Level.INFO, "Starting realtime subscription at " + realtime_metric_topic_name);
|
||||
BrokerSubscriber subscriber = new BrokerSubscriber(realtime_metric_topic_name, broker_ip_address, broker_username, broker_password, amq_library_configuration_location);
|
||||
BiFunction<String, String, String> function = (topic, message) -> {
|
||||
synchronized (detector.getSubcomponent_state().getMonitoring_attributes().get(topic)) {
|
||||
try {
|
||||
update_monitoring_attribute_value(detector,topic, ((Number) ((JSONObject) new JSONParser().parse(message)).get("metricValue")).doubleValue());
|
||||
|
||||
Logger.getAnonymousLogger().log(info_logging_level, "RECEIVED message with value for " + topic + " equal to " + (((JSONObject) new JSONParser().parse(message)).get("metricValue")));
|
||||
Logger.getGlobal().log(info_logging_level, "RECEIVED message with value for " + topic + " equal to " + (((JSONObject) new JSONParser().parse(message)).get("metricValue")));
|
||||
} catch (ParseException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
@ -151,7 +151,7 @@ public class UnboundedMonitoringAttributeTests {
|
||||
double confidence_interval = ((Number)json_array_confidence_interval.get(1)).doubleValue() - ((Number)json_array_confidence_interval.get(0)).doubleValue();
|
||||
long timestamp = ((Number)((JSONObject)new JSONParser().parse(message)).get(EventFields.PredictionMetricEventFields.timestamp)).longValue();
|
||||
long targeted_prediction_time = ((Number)((JSONObject)new JSONParser().parse(message)).get(EventFields.PredictionMetricEventFields.prediction_time)).longValue();
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"RECEIVED message with predicted value for "+predicted_attribute_name+" equal to "+ forecasted_value);
|
||||
Logger.getGlobal().log(info_logging_level,"RECEIVED message with predicted value for "+predicted_attribute_name+" equal to "+ forecasted_value);
|
||||
|
||||
synchronized (detector.can_modify_slo_rules) {
|
||||
if(!detector.can_modify_slo_rules.getValue()) {
|
||||
@ -160,7 +160,7 @@ public class UnboundedMonitoringAttributeTests {
|
||||
detector.can_modify_slo_rules.setValue(false);
|
||||
|
||||
if( detector.getSubcomponent_state().adaptation_times.size()==0 || (!detector.getSubcomponent_state().adaptation_times.contains(targeted_prediction_time)) && targeted_prediction_time>detector.getSubcomponent_state().adaptation_times.stream().min(Long::compare).get()){
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Adding a new targeted prediction time "+targeted_prediction_time);
|
||||
Logger.getGlobal().log(info_logging_level,"Adding a new targeted prediction time "+targeted_prediction_time);
|
||||
detector.getSubcomponent_state().adaptation_times.add(targeted_prediction_time);
|
||||
synchronized (detector.PREDICTION_EXISTS) {
|
||||
detector.PREDICTION_EXISTS.setValue(true);
|
||||
@ -209,7 +209,7 @@ public class UnboundedMonitoringAttributeTests {
|
||||
double upper_bound = detector.getSubcomponent_state().getMonitoring_attributes_statistics().get(metric_1_name).getUpper_bound();
|
||||
double lower_bound = detector.getSubcomponent_state().getMonitoring_attributes_statistics().get(metric_1_name).getLower_bound();
|
||||
|
||||
Logger.getAnonymousLogger().log(Level.INFO,"The bounds calculated are\nLower bound: "+lower_bound+"\nUpper bound: "+upper_bound);
|
||||
Logger.getGlobal().log(Level.INFO,"The bounds calculated are\nLower bound: "+lower_bound+"\nUpper bound: "+upper_bound);
|
||||
//assert (upper_bound<130 && upper_bound>110 && lower_bound>20 && lower_bound <35);
|
||||
|
||||
SLORule rule = new SLORule(detector,rule_json.toJSONString());
|
||||
@ -217,7 +217,7 @@ public class UnboundedMonitoringAttributeTests {
|
||||
assert (upper_bound<metric_upper_bound_range[1] && upper_bound>metric_upper_bound_range[0] && lower_bound>metric_lower_bound_range[0] && lower_bound <metric_lower_bound_range[1]);
|
||||
|
||||
double rule_severity = process_rule_value(rule,targeted_prediction_time);
|
||||
Logger.getAnonymousLogger().log(Level.INFO,"The severity calculated is\nSeverity: "+rule_severity);
|
||||
Logger.getGlobal().log(Level.INFO,"The severity calculated is\nSeverity: "+rule_severity);
|
||||
assert (rule_severity>severity_lower_bound);
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user