Miscellaneous changes and code refactoring
Introduction of Spring Boot support Introduction of two separate operational modes, plain detector and director - each a separate subcomponent. Migration of independent threads to use the CharacterizedThread type in order to allow easier management Creation of Runnables class which should eventually contain all Runnables Introduction of DebugMainClass holding the previous functionality of the SLO Violation Detector Differentiation of persistent running threads and slo-bound running threads Small initial improvement of the README file in the direction of specifying the different desired behaviour of director or detector Introduction of new default topics for lost devices and for the slo rules announcement Change-Id: Ibab69eb96bed65135dba0678964a0fd11d264dce Change-Id: I9ff992dd9fccb93984b354a537eee1442d636663
This commit is contained in:
parent
b20b1d6b77
commit
05a804d43f
@ -106,10 +106,12 @@ then an SLO violation should be triggered.
|
||||
|
||||
The component can be built using Maven (`mvn clean install -Dtest=!UnboundedMonitoringAttributeTests`). This command should succeed without errors, and verifying that all tests (except for the Unbounded monitoring attribute tests) are successfully executed. Then, any of the produced jar files (either the shaded or non-shaded version) can be run using the following command:
|
||||
|
||||
`java -jar <jar_name> <configuration_file_location>`
|
||||
`java -jar <jar_name> <role_type> <configuration_file_location>`
|
||||
|
||||
When the component starts correctly it will not display any error logs, and it may also display that it listens for events on the topic in which SLO rules are to be specified (by default **metrics.metric_list**).
|
||||
|
||||
It is not mandatory to specify the <configuration_file_location> or the <role_type> but the defaults will be assumed (the location of the configuration file will be based on the Constants.java class and the role will be OperationalMode.DIRECTOR )
|
||||
|
||||
When debugging/developing, the component can be started from the Java main method which is located inside the src/runtime/Main.java file.
|
||||
|
||||
### Testing process
|
||||
@ -134,6 +136,11 @@ To test the functionality of the component - provided that a working ActiveMQ Br
|
||||
|
||||
To illustrate, in the case that an SLO message identical to the simple SLO example is sent at step 1, then monitoring messages should be sent in step 2 to the `cpu_usage` topic and predicted monitoring messages should be sent in step 3 to the `prediction.cpu_usage` topic. Finally, SLO violations will be announced at the `prediction.slo_severity_value` topic.
|
||||
|
||||
### Development
|
||||
|
||||
Starting new threads in the SLO Violation Detection component should only be done using the CharacterizedThread class, as opposed to using plain Threads - to reassure that Threads are being defined in a way which permits their appropriate management (registration/removal).
|
||||
|
||||
|
||||
### Docker container build
|
||||
|
||||
To run the component in Dockerized form, it is sufficient to build the Dockerfile which is included at the root of the project. When running the docker container, the configuration file which will be used is the `src/main/resources/config/eu.morphemic.slo_violation_detector.properties` file, relative to the root of the project (this location specified as a variable in the `configuration.Constants` class). If another configuration file needs to be used, then it should be mounted over the `/home/src/main/resources/config/eu.morphemic.slo_violation_detector.properties` location.
|
||||
|
@ -9,33 +9,16 @@
|
||||
<version>4.0-SNAPSHOT</version>
|
||||
<build>
|
||||
<plugins>
|
||||
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-shade-plugin</artifactId>
|
||||
<version>3.2.4</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>shade</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<transformers>
|
||||
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
|
||||
<mainClass>runtime.Main</mainClass>
|
||||
</transformer>
|
||||
</transformers>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-maven-plugin</artifactId>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<configuration>
|
||||
<source>11</source>
|
||||
<target>11</target>
|
||||
<source>17</source>
|
||||
<target>17</target>
|
||||
</configuration>
|
||||
<version>3.8.1</version>
|
||||
</plugin>
|
||||
@ -65,8 +48,33 @@
|
||||
</pluginManagement>
|
||||
</build>
|
||||
|
||||
<parent>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-parent</artifactId>
|
||||
<version>3.1.5</version>
|
||||
<relativePath/> <!-- lookup parent from repository -->
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-web</artifactId>
|
||||
<version>3.1.2</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-tomcat</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>gr.ntua.imu.morphemic</groupId>
|
||||
<artifactId>amq-message-java-library</artifactId>
|
||||
<version>4.0.0-SNAPSHOT</version>
|
||||
@ -76,13 +84,6 @@
|
||||
<artifactId>json-simple</artifactId>
|
||||
<version>1.1</version>
|
||||
</dependency>
|
||||
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
|
||||
<dependency>
|
||||
<groupId>org.projectlombok</groupId>
|
||||
<artifactId>lombok</artifactId>
|
||||
<version>1.18.30</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
<!-- https://mvnrepository.com/artifact/javax.jms/javax.jms-api -->
|
||||
<dependency>
|
||||
@ -97,25 +98,15 @@
|
||||
<version>5.16.1</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
<version>1.8.0-beta4</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework</groupId>
|
||||
<artifactId>spring-context</artifactId>
|
||||
<version>5.3.3</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<!-- <dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-autoconfigure</artifactId>
|
||||
<version>2.4.3</version>
|
||||
</dependency>
|
||||
<version>3.1.5</version>
|
||||
</dependency> !-->
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-collections4</artifactId>
|
||||
<version>4.2</version>
|
||||
<version>4.4</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
|
@ -36,6 +36,8 @@ public class Constants {
|
||||
public static String configuration_file_location = "src/main/resources/config/input_data.properties";
|
||||
public static String amq_library_configuration_location = "src/main/resources/config/eu.melodic.event.brokerclient.properties";
|
||||
public static String topic_for_severity_announcement = "prediction.slo_severity_value";
|
||||
public static String topic_for_lost_device_announcement = "eu.nebulouscloud.device_lost";
|
||||
public static String slo_rules_topic = "metrics.metric_list";
|
||||
public static double slo_violation_probability_threshold = 0.5; //The threshold over which the probability of a predicted slo violation should be to have a violation detection
|
||||
public static int kept_values_per_metric = 5; //Default to be overriden from the configuration file. This indicates how many metric values are kept to calculate the "previous" metric value during the rate of change calculation
|
||||
public static String roc_calculation_mode = "prototype";
|
||||
|
@ -18,6 +18,7 @@ import org.json.simple.parser.ParseException;
|
||||
import runtime.Main;
|
||||
import slo_processing.SLORule;
|
||||
import slo_processing.SLOSubRule;
|
||||
import utility_beans.CharacterizedThread;
|
||||
import utility_beans.PredictedMonitoringAttribute;
|
||||
import utility_beans.RealtimeMonitoringAttribute;
|
||||
|
||||
@ -28,13 +29,13 @@ import java.util.logging.Logger;
|
||||
|
||||
import static configuration.Constants.*;
|
||||
import static runtime.Main.*;
|
||||
import static utilities.SLOViolationDetectorStateUtils.*;
|
||||
import static utility_beans.CharacterizedThread.CharacterizedThreadType.slo_bound_running_thread;
|
||||
import static utility_beans.PredictedMonitoringAttribute.getPredicted_monitoring_attributes;
|
||||
import static utility_beans.RealtimeMonitoringAttribute.update_monitoring_attribute_value;
|
||||
|
||||
public class AttributeSubscription {
|
||||
SLORule slo_rule;
|
||||
private Thread realtime_subscription_thread, forecasted_subscription_thread;
|
||||
|
||||
|
||||
public AttributeSubscription(SLORule slo_rule, String broker_ip_address, String broker_username, String broker_password){
|
||||
this.slo_rule = slo_rule;
|
||||
@ -59,9 +60,9 @@ public class AttributeSubscription {
|
||||
}
|
||||
return message;
|
||||
};
|
||||
realtime_subscription_thread = new Thread(() -> {
|
||||
Runnable realtime_subscription_runnable = () -> {
|
||||
try {
|
||||
subscriber.subscribe(function, Main.stop_signal);
|
||||
subscriber.subscribe(function, stop_signal);
|
||||
if(Thread.interrupted()){
|
||||
throw new InterruptedException();
|
||||
}
|
||||
@ -72,11 +73,10 @@ public class AttributeSubscription {
|
||||
}
|
||||
}finally{
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Removing realtime subscriber thread for "+realtime_metric_topic_name);
|
||||
running_threads.remove("realtime_subscriber_thread_" + realtime_metric_topic_name);
|
||||
slo_bound_running_threads.remove("realtime_subscriber_thread_" + realtime_metric_topic_name);
|
||||
}
|
||||
});
|
||||
running_threads.put("realtime_subscriber_thread_"+realtime_metric_topic_name,realtime_subscription_thread);
|
||||
realtime_subscription_thread.start();
|
||||
};
|
||||
CharacterizedThread.create_new_thread(realtime_subscription_runnable,"realtime_subscriber_thread_"+realtime_metric_topic_name,slo_bound_running_thread,true);
|
||||
|
||||
|
||||
|
||||
@ -137,11 +137,11 @@ public class AttributeSubscription {
|
||||
ADAPTATION_TIMES_MODIFY.setValue(true);
|
||||
ADAPTATION_TIMES_MODIFY.notifyAll();
|
||||
}
|
||||
synchronized (Main.can_modify_slo_rules) {
|
||||
while(!Main.can_modify_slo_rules.getValue()) {
|
||||
Main.can_modify_slo_rules.wait();
|
||||
synchronized (can_modify_slo_rules) {
|
||||
while(!can_modify_slo_rules.getValue()) {
|
||||
can_modify_slo_rules.wait();
|
||||
}
|
||||
Main.can_modify_slo_rules.setValue(false);
|
||||
can_modify_slo_rules.setValue(false);
|
||||
for (SLOSubRule subrule : SLOSubRule.getSlo_subrules_per_monitoring_attribute().get(predicted_attribute_name)) { //Get the subrules which are associated to the monitoring attribute which is predicted, and perform the following processing to each one of them
|
||||
|
||||
getPredicted_monitoring_attributes().computeIfAbsent(subrule.getId(), k -> new HashMap<>());
|
||||
@ -173,12 +173,11 @@ public class AttributeSubscription {
|
||||
return message;
|
||||
};
|
||||
|
||||
|
||||
forecasted_subscription_thread = new Thread(() -> {
|
||||
Runnable forecasted_subscription_runnable = () -> {
|
||||
try {
|
||||
synchronized (Main.HAS_MESSAGE_ARRIVED.get_synchronized_boolean(forecasted_metric_topic_name)) {
|
||||
synchronized (HAS_MESSAGE_ARRIVED.get_synchronized_boolean(forecasted_metric_topic_name)) {
|
||||
//if (Main.HAS_MESSAGE_ARRIVED.get_synchronized_boolean(forecasted_metric_topic_name).getValue())
|
||||
forecasted_subscriber.subscribe(forecasted_function,Main.stop_signal);
|
||||
forecasted_subscriber.subscribe(forecasted_function,stop_signal);
|
||||
}
|
||||
if (Thread.interrupted()) {
|
||||
throw new InterruptedException();
|
||||
@ -190,12 +189,10 @@ public class AttributeSubscription {
|
||||
}
|
||||
}finally {
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Removing forecasting subscriber thread for "+forecasted_metric_topic_name);
|
||||
running_threads.remove("forecasting_subscriber_thread_"+forecasted_metric_topic_name);
|
||||
slo_bound_running_threads.remove("forecasting_subscriber_thread_"+forecasted_metric_topic_name);
|
||||
}
|
||||
});
|
||||
running_threads.put("forecasting_subscriber_thread_"+forecasted_metric_topic_name,forecasted_subscription_thread);
|
||||
forecasted_subscription_thread.start();
|
||||
|
||||
};
|
||||
CharacterizedThread.create_new_thread(forecasted_subscription_runnable, "forecasting_subscriber_thread_" + forecasted_metric_topic_name, slo_bound_running_thread, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,155 @@
|
||||
package processing_logic;
|
||||
|
||||
import eu.melodic.event.brokerclient.BrokerPublisher;
|
||||
import org.json.simple.JSONObject;
|
||||
import slo_processing.SLORule;
|
||||
import utility_beans.CharacterizedThread;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.sql.Timestamp;
|
||||
import java.time.Clock;
|
||||
import java.util.Date;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import static configuration.Constants.*;
|
||||
import static java.lang.Thread.sleep;
|
||||
import static runtime.Main.run_slo_violation_detection_engine;
|
||||
import static slo_processing.SLORule.process_rule_value;
|
||||
import static utilities.SLOViolationDetectorStateUtils.*;
|
||||
import static utilities.SLOViolationDetectorUtils.*;
|
||||
import static utility_beans.CharacterizedThread.CharacterizedThreadType.slo_bound_running_thread;
|
||||
|
||||
public class Runnables {
|
||||
public static Runnable slo_detector_mode_runnable = () -> {
|
||||
try {
|
||||
run_slo_violation_detection_engine();
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
};
|
||||
|
||||
public static Runnable get_severity_calculation_runnable(SLORule rule) {
|
||||
|
||||
Runnable severity_calculation_runnable = () -> {
|
||||
BrokerPublisher persistent_publisher = new BrokerPublisher(topic_for_severity_announcement, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password"), amq_library_configuration_location);
|
||||
|
||||
while (!stop_signal.get()) {
|
||||
/*try {
|
||||
Thread.sleep(time_horizon_seconds*1000);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}*/
|
||||
synchronized (PREDICTION_EXISTS) {
|
||||
while (!PREDICTION_EXISTS.getValue()) {
|
||||
try {
|
||||
PREDICTION_EXISTS.wait();
|
||||
} catch (InterruptedException e) {
|
||||
synchronized (stop_signal) {
|
||||
if (stop_signal.get()) {
|
||||
slo_bound_running_threads.remove("severity_calculation_thread_" + rule.toString());
|
||||
PREDICTION_EXISTS.setValue(false);
|
||||
return;
|
||||
}
|
||||
}
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
try {
|
||||
Clock clock = Clock.systemUTC();
|
||||
Long current_time = clock.millis();
|
||||
Long targeted_prediction_time;
|
||||
synchronized (ADAPTATION_TIMES_MODIFY) {
|
||||
while (!ADAPTATION_TIMES_MODIFY.getValue()) {
|
||||
try {
|
||||
ADAPTATION_TIMES_MODIFY.wait();
|
||||
} catch (InterruptedException e) {
|
||||
Logger.getAnonymousLogger().log(warning_logging_level, "Interrupted while waiting to access the lock for adaptation times object");
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
ADAPTATION_TIMES_MODIFY.setValue(false);
|
||||
clean_data(adaptation_times_to_remove);
|
||||
//targeted_prediction_time = adaptation_times.stream().min(Long::compare).get();
|
||||
targeted_prediction_time = get_next_targeted_prediction_time();
|
||||
ADAPTATION_TIMES_MODIFY.setValue(true);
|
||||
ADAPTATION_TIMES_MODIFY.notifyAll();
|
||||
}
|
||||
if (targeted_prediction_time == null) {
|
||||
continue;
|
||||
}
|
||||
Logger.getAnonymousLogger().log(info_logging_level, "Targeted_prediction_time " + targeted_prediction_time);
|
||||
Runnable internal_severity_calculation_runnable = () -> {
|
||||
try {
|
||||
/*
|
||||
synchronized (ADAPTATION_TIMES_MODIFY) {
|
||||
while (!ADAPTATION_TIMES_MODIFY.getValue()) {
|
||||
ADAPTATION_TIMES_MODIFY.wait();
|
||||
}
|
||||
ADAPTATION_TIMES_MODIFY.setValue(false);
|
||||
adaptation_times.remove(targeted_prediction_time);//remove from the list of timepoints which should be processed. Later this timepoint will be added to the adaptation_times_to_remove HashSet to remove any data associated with it
|
||||
ADAPTATION_TIMES_MODIFY.setValue(true);
|
||||
ADAPTATION_TIMES_MODIFY.notifyAll();
|
||||
}
|
||||
//adaptation_times_pending_processing.add(targeted_prediction_time);
|
||||
|
||||
*/
|
||||
synchronized (PREDICTION_EXISTS) {
|
||||
PREDICTION_EXISTS.setValue(adaptation_times.size() > 0);
|
||||
}
|
||||
|
||||
Long sleep_time = targeted_prediction_time * 1000 - time_horizon_seconds * 1000L - current_time;
|
||||
if (sleep_time <= 0) {
|
||||
Logger.getAnonymousLogger().log(info_logging_level, "Prediction cancelled as targeted prediction time was " + targeted_prediction_time * 1000 + " current time is " + current_time + " and the time_horizon is " + time_horizon_seconds * 1000);
|
||||
return; //The predictions are too near to the targeted reconfiguration time (or are even obsolete)
|
||||
} else if (sleep_time > current_time + maximum_acceptable_forward_predictions * time_horizon_seconds * 1000L) {
|
||||
Logger.getAnonymousLogger().log(info_logging_level, "Prediction cancelled as targeted prediction time was " + targeted_prediction_time * 1000 + " and the current time is " + current_time + ". The prediction is more than " + maximum_acceptable_forward_predictions + " time_horizon intervals into the future (the time_horizon is " + time_horizon_seconds * 1000 + " milliseconds)");
|
||||
return; //The predictions are too near to the targeted reconfiguration tim
|
||||
}
|
||||
Logger.getAnonymousLogger().log(info_logging_level, "Sleeping for " + sleep_time + " milliseconds");
|
||||
sleep(sleep_time);
|
||||
double rule_severity = process_rule_value(rule.getRule_representation(), targeted_prediction_time, rule.getRule_format());
|
||||
double slo_violation_probability = determine_slo_violation_probability(rule_severity);
|
||||
Logger.getAnonymousLogger().log(info_logging_level, "The overall " + slo_violation_determination_method + " severity - calculated from real data - for adaptation time " + targeted_prediction_time + " ( " + (new Date((new Timestamp(targeted_prediction_time * 1000)).getTime())) + " ) is " + rule_severity + " and is calculated " + time_horizon_seconds + " seconds beforehand");
|
||||
Logger.getAnonymousLogger().log(info_logging_level, "The probability of an SLO violation is " + ((int) (slo_violation_probability * 100)) + "%" + (slo_violation_probability < slo_violation_probability_threshold ? " so it will not be published" : " and it will be published"));
|
||||
|
||||
if (slo_violation_probability >= slo_violation_probability_threshold) {
|
||||
JSONObject severity_json = new JSONObject();
|
||||
severity_json.put("severity", rule_severity);
|
||||
severity_json.put("probability", slo_violation_probability);
|
||||
severity_json.put("predictionTime", targeted_prediction_time);
|
||||
persistent_publisher.publish(severity_json.toJSONString());
|
||||
}
|
||||
|
||||
slo_violation_event_recording_queue.add(System.currentTimeMillis());
|
||||
|
||||
//Probably not necessary to synchronize the line below as each removal will happen only once in a reconfiguration interval, and reconfiguration intervals are assumed to have a duration of minutes.
|
||||
//Necessary to synchronize because another severity calculation thread might invoke clean_data above, and then a concurrent modification exception may arise
|
||||
synchronized (ADAPTATION_TIMES_MODIFY) {
|
||||
while (!ADAPTATION_TIMES_MODIFY.getValue()) {
|
||||
ADAPTATION_TIMES_MODIFY.wait();
|
||||
}
|
||||
ADAPTATION_TIMES_MODIFY.setValue(false);
|
||||
adaptation_times_to_remove.add(targeted_prediction_time); //This line serves a different purpose from the adaptation_times.remove(...) directive above, as the adaptation_times_to_remove HashSet contains timepoints which should be processed to delete their data.
|
||||
adaptation_times_pending_processing.remove(targeted_prediction_time);
|
||||
ADAPTATION_TIMES_MODIFY.setValue(true);
|
||||
ADAPTATION_TIMES_MODIFY.notifyAll();
|
||||
}
|
||||
} catch (InterruptedException i) {
|
||||
Logger.getAnonymousLogger().log(severe_logging_level, "Severity calculation thread for epoch time " + targeted_prediction_time + " interrupted, stopping...");
|
||||
return;
|
||||
}
|
||||
};
|
||||
CharacterizedThread.create_new_thread(internal_severity_calculation_runnable, "internal_severity_calculation_thread_" + targeted_prediction_time, slo_bound_running_thread, true);
|
||||
} catch (NoSuchElementException n) {
|
||||
Logger.getAnonymousLogger().log(warning_logging_level, "Could not calculate severity as a value was missing...");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
slo_bound_running_threads.remove("severity_calculation_thread_" + rule.toString());
|
||||
};
|
||||
return severity_calculation_runnable;
|
||||
}
|
||||
}
|
719
slo-violation-detector/src/main/java/runtime/DebugMainClass.java
Normal file
719
slo-violation-detector/src/main/java/runtime/DebugMainClass.java
Normal file
@ -0,0 +1,719 @@
|
||||
/*
|
||||
* Copyright (c) 2023 Institute of Communication and Computer Systems
|
||||
*
|
||||
* This Source Code Form is subject to the terms of the Mozilla Public
|
||||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
*/
|
||||
package runtime;
|
||||
import eu.melodic.event.brokerclient.BrokerPublisher;
|
||||
import eu.melodic.event.brokerclient.BrokerSubscriber;
|
||||
import metric_retrieval.AttributeSubscription;
|
||||
import org.apache.commons.collections4.queue.CircularFifoQueue;
|
||||
import org.json.simple.JSONArray;
|
||||
import org.json.simple.JSONObject;
|
||||
import org.json.simple.parser.JSONParser;
|
||||
import slo_processing.SLORule;
|
||||
import slo_processing.SLOSubRule;
|
||||
import utilities.DebugDataSubscription;
|
||||
import utilities.MathUtils;
|
||||
import utilities.MonitoringAttributeUtilities;
|
||||
import utility_beans.*;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
import java.sql.Timestamp;
|
||||
import java.time.Clock;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static configuration.Constants.*;
|
||||
import static java.lang.Thread.sleep;
|
||||
import static slo_processing.SLORule.process_rule_value;
|
||||
import static utility_beans.PredictedMonitoringAttribute.getPredicted_monitoring_attributes;
|
||||
import static utility_beans.RealtimeMonitoringAttribute.getMonitoring_attributes_bounds_representation;
|
||||
|
||||
public class DebugMainClass {
|
||||
public static final AtomicBoolean stop_signal = new AtomicBoolean(false);
|
||||
public static final SynchronizedBoolean PREDICTION_EXISTS = new SynchronizedBoolean(false);
|
||||
public static final SynchronizedBoolean ADAPTATION_TIMES_MODIFY = new SynchronizedBoolean(true);
|
||||
public static SynchronizedBooleanMap HAS_MESSAGE_ARRIVED = new SynchronizedBooleanMap();
|
||||
public static SynchronizedStringMap MESSAGE_CONTENTS = new SynchronizedStringMap();
|
||||
public static ArrayList<SLORule> slo_rules = new ArrayList<>();
|
||||
public static HashMap<String,Thread> running_threads = new HashMap<>();
|
||||
public static HashSet<Long> adaptation_times = new HashSet<>();
|
||||
public static HashSet<Long> adaptation_times_pending_processing = new HashSet<>();
|
||||
private static HashSet<Long> adaptation_times_to_remove = new HashSet<>();
|
||||
public static Long last_processed_adaptation_time = -1L;//initialization
|
||||
public static Long current_slo_rules_version = -1L;//initialization
|
||||
public static final AtomicBoolean slo_rule_arrived = new AtomicBoolean(false);
|
||||
public static final SynchronizedBoolean can_modify_slo_rules = new SynchronizedBoolean(false);
|
||||
|
||||
//Debugging variables
|
||||
public static CircularFifoQueue<Long> slo_violation_event_recording_queue = new CircularFifoQueue<>(50);
|
||||
public static CircularFifoQueue<String> severity_calculation_event_recording_queue = new CircularFifoQueue<>(50);
|
||||
private static Properties prop = new Properties();
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
||||
//The input of this program is the type of the SLO violations which are monitored, and the predicted metric values which are evaluated. Its output are events which produce an estimate of the probability of an adaptation.
|
||||
//The SLO violations which are monitored need to mention the following data:
|
||||
// - The name of the predicted metrics which are monitored as part of the rule
|
||||
// - The threshold and whether it is a more-than or less-than threshold
|
||||
//The predicted metrics need to include the following data:
|
||||
// - The predicted value
|
||||
// - The prediction confidence
|
||||
|
||||
//During 'normal' execution, business code starts being executed inside the while loop in the try-catch block, and more specifically, code in the 'else' part of the first *if (first_run && USE_CONFIGURATION_FILE_FOR_METRIC_VALUES_INPUT)* statement, specifically inside the 'else' part, of the second *if (first_run && USE_CONFIGURATION_FILE_FOR_METRIC_TOPICS_INPUT)* statement. This code initializes a subscriber to the topic where rules are expected to be received. This debug class includes also other options for the initial startup of the component.
|
||||
|
||||
try {
|
||||
InputStream inputStream = null;
|
||||
if (args.length == 0) {
|
||||
base_project_path = new File("").toURI();
|
||||
URI absolute_configuration_file_path = new File(configuration_file_location).toURI();
|
||||
URI relative_configuration_file_path = base_project_path.relativize(absolute_configuration_file_path);
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"This is the base project path:"+base_project_path);
|
||||
inputStream = new FileInputStream(base_project_path.getPath()+relative_configuration_file_path);
|
||||
} else {
|
||||
Logger.getAnonymousLogger().log(info_logging_level, "A preferences file has been manually specified");
|
||||
|
||||
if (base_project_path == null || base_project_path.getPath().equals(EMPTY)) {
|
||||
base_project_path = new File(args[0]).toURI();
|
||||
}
|
||||
inputStream = new FileInputStream(base_project_path.getPath());
|
||||
}
|
||||
prop.load(inputStream);
|
||||
String slo_rules_topic = prop.getProperty("slo_rules_topic");
|
||||
kept_values_per_metric = Integer.parseInt(prop.getProperty("stored_values_per_metric","5"));
|
||||
self_publish_rule_file = Boolean.parseBoolean(prop.getProperty("self_publish_rule_file"));
|
||||
single_slo_rule_active = Boolean.parseBoolean(prop.getProperty("single_slo_rule_active"));
|
||||
time_horizon_seconds = Integer.parseInt(prop.getProperty("time_horizon_seconds"));
|
||||
|
||||
slo_violation_probability_threshold = Double.parseDouble(prop.getProperty("slo_violation_probability_threshold"));
|
||||
slo_violation_determination_method = prop.getProperty("slo_violation_determination_method");
|
||||
maximum_acceptable_forward_predictions = Integer.parseInt(prop.getProperty("maximum_acceptable_forward_predictions"));
|
||||
ArrayList<String> unbounded_metric_strings = new ArrayList<String>(Arrays.asList(prop.getProperty("metrics_bounds").split(",")));
|
||||
for (String metric_string : unbounded_metric_strings){
|
||||
getMonitoring_attributes_bounds_representation().put(metric_string.split(";")[0], metric_string.split(";",2)[1]);
|
||||
}
|
||||
|
||||
while (true) {
|
||||
|
||||
if (first_run && USE_CONFIGURATION_FILE_FOR_METRIC_VALUES_INPUT) {
|
||||
|
||||
String json_file_name = prop.getProperty("input_file");
|
||||
slo_violation_determination_method = prop.getProperty("slo_violation_determination_method");
|
||||
confidence_interval = Double.parseDouble(prop.getProperty("confidence_interval"));
|
||||
prediction_certainty = Double.parseDouble(prop.getProperty("prediction_certainty"));
|
||||
|
||||
ArrayList<String> metric_names = new ArrayList<>() {{
|
||||
add("cpu");
|
||||
add("ram");
|
||||
add("bandwidth");
|
||||
add("disk");
|
||||
}};
|
||||
HashMap<String, Double> input_data = new HashMap<>();
|
||||
for (String metric_name : metric_names) {
|
||||
|
||||
Double metric_input_data = MathUtils.get_average(new ArrayList<>(Arrays.stream(prop.getProperty(metric_name).split(",")).map(Double::parseDouble).collect(Collectors.toList())));
|
||||
|
||||
input_data.put(metric_name, metric_input_data);
|
||||
}
|
||||
|
||||
|
||||
RealtimeMonitoringAttribute.initialize_monitoring_attribute_rates_of_change(metric_names);
|
||||
RealtimeMonitoringAttribute.simple_initialize_0_100_bounded_attributes(metric_names);
|
||||
|
||||
RealtimeMonitoringAttribute.update_monitoring_attributes_values_map(input_data);
|
||||
|
||||
//Parsing of file
|
||||
String rules_json_string = String.join(EMPTY, Files.readAllLines(Paths.get(new File(json_file_name).getAbsolutePath())));
|
||||
Logger.getAnonymousLogger().log(info_logging_level, rules_json_string);
|
||||
MESSAGE_CONTENTS.assign_value(slo_rules_topic, rules_json_string);
|
||||
slo_rules.add(new SLORule(MESSAGE_CONTENTS.get_synchronized_contents(slo_rules_topic), new ArrayList<>(Arrays.asList(prop.getProperty("metrics_list").split(",")))));
|
||||
|
||||
} else {
|
||||
if (first_run && USE_CONFIGURATION_FILE_FOR_METRIC_TOPICS_INPUT) {
|
||||
synchronized (can_modify_slo_rules) {
|
||||
//do {
|
||||
// can_modify_slo_rules.wait();
|
||||
//}while(!can_modify_slo_rules.getValue());
|
||||
can_modify_slo_rules.setValue(false);
|
||||
|
||||
slo_rules.add(new SLORule(MESSAGE_CONTENTS.get_synchronized_contents(slo_rules_topic), new ArrayList<>(Arrays.asList(prop.getProperty("metrics_list").split(",")))));
|
||||
can_modify_slo_rules.setValue(true);
|
||||
slo_rule_arrived.set(true);
|
||||
can_modify_slo_rules.notifyAll();
|
||||
}
|
||||
} else if (first_run){
|
||||
|
||||
BiFunction<String, String, String> function = (topic, message) -> {
|
||||
synchronized (can_modify_slo_rules) {
|
||||
can_modify_slo_rules.setValue(true);
|
||||
MESSAGE_CONTENTS.assign_value(topic, message);
|
||||
slo_rule_arrived.set(true);
|
||||
can_modify_slo_rules.notifyAll();
|
||||
|
||||
Logger.getAnonymousLogger().log(info_logging_level, "BrokerClientApp: - Received text message: " + message + " at topic " + topic);
|
||||
|
||||
}
|
||||
return topic + ":MSG:" + message;
|
||||
};
|
||||
|
||||
BrokerSubscriber subscriber = new BrokerSubscriber(slo_rules_topic, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password"), amq_library_configuration_location);
|
||||
new Thread(() -> {
|
||||
while (true) {
|
||||
subscriber.subscribe(function, new AtomicBoolean(false)); //This subscriber should be immune to stop signals
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Broker unavailable, will try to reconnect after 10 seconds");
|
||||
try {
|
||||
Thread.sleep(10000);
|
||||
}catch (InterruptedException i){
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Sleep was interrupted, will immediately try to connect to the broker");
|
||||
}
|
||||
}
|
||||
}).start();
|
||||
|
||||
if (self_publish_rule_file) {
|
||||
String json_file_name = prop.getProperty("input_file");
|
||||
String rules_json_string = String.join(EMPTY, Files.readAllLines(Paths.get(new File(json_file_name).getAbsolutePath())));
|
||||
BrokerPublisher publisher = new BrokerPublisher(slo_rules_topic, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password"), amq_library_configuration_location);
|
||||
publisher.publish(rules_json_string);
|
||||
Logger.getAnonymousLogger().log(info_logging_level, "Sent message\n" + rules_json_string);
|
||||
}
|
||||
}
|
||||
}
|
||||
first_run = false;
|
||||
synchronized (can_modify_slo_rules) {
|
||||
do {
|
||||
try {
|
||||
can_modify_slo_rules.wait();
|
||||
}catch (InterruptedException i){
|
||||
i.printStackTrace();
|
||||
}
|
||||
}while((!can_modify_slo_rules.getValue()) || (!slo_rule_arrived.get()));
|
||||
can_modify_slo_rules.setValue(false);
|
||||
slo_rule_arrived.set(false);
|
||||
String rule_representation = MESSAGE_CONTENTS.get_synchronized_contents(slo_rules_topic);
|
||||
if (slo_rule_arrived_has_updated_version(rule_representation)) {
|
||||
if (single_slo_rule_active) {
|
||||
slo_rules.clear();
|
||||
}
|
||||
|
||||
ArrayList<String> additional_metrics_from_new_slo = get_metric_list_from_JSON_slo(rule_representation);
|
||||
|
||||
if (additional_metrics_from_new_slo.size() > 0) {
|
||||
slo_rules.add(new SLORule(rule_representation, additional_metrics_from_new_slo));
|
||||
}
|
||||
can_modify_slo_rules.setValue(true);
|
||||
can_modify_slo_rules.notifyAll();
|
||||
}else{
|
||||
can_modify_slo_rules.setValue(true);
|
||||
can_modify_slo_rules.notifyAll();
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
stop_all_running_threads();
|
||||
DebugDataSubscription.initiate(prop.getProperty("broker_ip_url"),prop.getProperty("broker_username"), prop.getProperty("broker_password"));
|
||||
initialize_monitoring_datastructures_with_empty_data(slo_rules);
|
||||
//
|
||||
initialize_subrule_and_attribute_associations(slo_rules);
|
||||
initialize_attribute_subscribers(slo_rules, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password"));
|
||||
initialize_slo_processing(slo_rules);
|
||||
|
||||
//while (true) {
|
||||
|
||||
//}
|
||||
}
|
||||
}catch (Exception e){
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Problem reading input file");
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean slo_rule_arrived_has_updated_version(String rule_representation) {
|
||||
JSONObject json_object = null;
|
||||
long json_object_version = Integer.MAX_VALUE;
|
||||
try {
|
||||
json_object = (JSONObject) new JSONParser().parse(rule_representation);
|
||||
json_object_version = (Long) json_object.get("version");
|
||||
} catch (NullPointerException n){
|
||||
n.printStackTrace();
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Unfortunately a null message was sent to the SLO Violation Detector, which is being ignored");
|
||||
return false;
|
||||
} catch (Exception e){
|
||||
e.printStackTrace();
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Could not parse the JSON of the new SLO, assuming it is not an updated rule...");
|
||||
return false;
|
||||
}
|
||||
if (json_object_version > current_slo_rules_version){
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"An SLO with updated version ("+json_object_version+" vs older "+current_slo_rules_version+") has arrived");
|
||||
current_slo_rules_version=json_object_version;
|
||||
return true;
|
||||
}else {
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Taking no action for the received SLO message as the version number is not updated");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private static void stop_all_running_threads() {
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Asking previously existing threads to terminate");
|
||||
int initial_number_of_running_threads = running_threads.size();
|
||||
while (running_threads.size()>0) {
|
||||
synchronized (stop_signal) {
|
||||
stop_signal.set(true);
|
||||
stop_signal.notifyAll();
|
||||
}
|
||||
try {
|
||||
Thread.sleep(3000);
|
||||
running_threads.values().forEach(Thread::interrupt);
|
||||
}catch (Exception e){
|
||||
}
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Stopped "+(initial_number_of_running_threads-running_threads.size())+"/"+initial_number_of_running_threads+" already running threads");
|
||||
if (running_threads.size()>1){
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"The threads which are still running are the following: "+running_threads);
|
||||
}else if (running_threads.size()>0){
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"The thread which is still running is the following: "+running_threads);
|
||||
}
|
||||
}
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"All threads have terminated");
|
||||
synchronized (stop_signal) {
|
||||
stop_signal.set(false);
|
||||
}
|
||||
synchronized (PREDICTION_EXISTS){
|
||||
PREDICTION_EXISTS.setValue(false);
|
||||
}
|
||||
adaptation_times.clear();
|
||||
}
|
||||
|
||||
public static void initialize_subrule_and_attribute_associations(ArrayList<SLORule> slo_rules) {
|
||||
synchronized (can_modify_slo_rules) {
|
||||
while (!can_modify_slo_rules.getValue()){
|
||||
try {
|
||||
can_modify_slo_rules.wait();
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
can_modify_slo_rules.setValue(false);
|
||||
for (SLORule slo_rule : slo_rules) {
|
||||
for (SLOSubRule subrule : SLORule.parse_subrules(slo_rule.getRule_representation(),slo_rule.getRule_format())) {
|
||||
SLOSubRule.getSlo_subrules_per_monitoring_attribute().computeIfAbsent(subrule.getMetric(), k -> new ArrayList<>());
|
||||
SLOSubRule.getSlo_subrules_per_monitoring_attribute().get(subrule.getMetric()).add(subrule);
|
||||
}
|
||||
}
|
||||
can_modify_slo_rules.setValue(true);
|
||||
can_modify_slo_rules.notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static void initialize_monitoring_datastructures_with_empty_data(ArrayList<SLORule> slo_rules){
|
||||
for(SLORule slo_rule: slo_rules){
|
||||
for (String metric_name : slo_rule.get_monitoring_attributes()) {
|
||||
MonitoringAttributeUtilities.initialize_values(metric_name);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static ArrayList<String> get_metric_list_from_JSON_slo(String json_object_string) {
|
||||
HashSet<String> metric_list = new HashSet<>();
|
||||
try {
|
||||
JSONObject json_object = (JSONObject) new JSONParser().parse(json_object_string);
|
||||
String json_object_id = (String) json_object.get("id");
|
||||
String json_object_name = (String) json_object.get("name");
|
||||
//Older format uses id-based fields, newer format uses a non-variable structure
|
||||
//We first check if an event using the older format is sent, and then check if the event is sent using the newer format
|
||||
if (json_object_id!=null) {
|
||||
if (json_object_id.split("-").length > 1) {
|
||||
//String composite_rule_type = json_object_id.split("-")[0];
|
||||
JSONArray internal_json_slos = (JSONArray) json_object.get(json_object_id);
|
||||
for (Object o : internal_json_slos) {
|
||||
JSONObject internal_json_slo = (JSONObject) o;
|
||||
metric_list.addAll(get_metric_list_from_JSON_slo(internal_json_slo.toJSONString()));
|
||||
}
|
||||
} else {
|
||||
metric_list.add((String) json_object.get("attribute"));
|
||||
}
|
||||
}
|
||||
//If newer format is used
|
||||
else if (json_object_name!=null){
|
||||
JSONArray internal_json_slos = (JSONArray) json_object.get("constraints");
|
||||
if ((internal_json_slos!=null) && (internal_json_slos.size()>0)){
|
||||
for (Object o : internal_json_slos) {
|
||||
JSONObject internal_json_slo = (JSONObject) o;
|
||||
metric_list.addAll(get_metric_list_from_JSON_slo(internal_json_slo.toJSONString()));
|
||||
}
|
||||
}else{
|
||||
metric_list.add((String) json_object.get("metric"));
|
||||
}
|
||||
}else{
|
||||
Logger.getAnonymousLogger().log(Level.INFO,"An SLO rule was sent in a format which could not be fully parsed, therefore ignoring this rule. The non-understandable part of the SLO rule is printed below"+"\n"+json_object_string);
|
||||
}
|
||||
}catch (Exception p){
|
||||
p.printStackTrace();
|
||||
return new ArrayList<String>();
|
||||
}
|
||||
return new ArrayList<String>(metric_list);
|
||||
}
|
||||
|
||||
private static double get_metric_value_from_JSON(String data_arrived) {
|
||||
double result = -1;
|
||||
|
||||
JSONObject json_data_representation = render_valid_json(data_arrived,"=");
|
||||
//JSONObject json_data_representation = (JSONObject) new JSONParser().parse(data_arrived);
|
||||
result = Double.parseDouble((String)json_data_representation.get("metricValue"));
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method replaces any invalid characters in the json which is received from the broker, and creates a valid JSON object
|
||||
* @param data_arrived The data which is received from the broker
|
||||
* @param string_to_replace The invalid character which should be substituted by an 'equals' sign
|
||||
* @return A JSON object
|
||||
*/
|
||||
private static JSONObject render_valid_json(String data_arrived, String string_to_replace) {
|
||||
String valid_json_string = new String(data_arrived);
|
||||
JSONObject json_object = new JSONObject();
|
||||
valid_json_string = valid_json_string.replaceAll(string_to_replace,":");
|
||||
|
||||
valid_json_string = valid_json_string.replaceAll("[{}]","");
|
||||
|
||||
String [] json_elements = valid_json_string.split(",");
|
||||
List <String> json_elements_list = Arrays.stream(json_elements).map(String::trim).collect(Collectors.toList());
|
||||
|
||||
for (String element: json_elements_list) {
|
||||
json_object.put(element.split(":")[0],element.split(":")[1]);
|
||||
}
|
||||
|
||||
return json_object;
|
||||
}
|
||||
|
||||
|
||||
private static void initialize_global_prediction_attribute_data(){
|
||||
Logger.getAnonymousLogger().log(warning_logging_level,"Get global prediction attribute data needs implementation");
|
||||
}
|
||||
|
||||
|
||||
private static PredictionAttributeSet get_prediction_attribute_set(ArrayList<SLORule> rules){
|
||||
//usedglobalHashmap: attribute_data,
|
||||
return null;
|
||||
}
|
||||
|
||||
private static PredictionAttributeSet initialize_with_existing_values(Double cpu_value, Double ram_value) {
|
||||
ArrayList<String> metric_names = new ArrayList<>(){{
|
||||
add("cpu");
|
||||
add("ram");
|
||||
add("hard_disk");
|
||||
}};
|
||||
RealtimeMonitoringAttribute.initialize_monitoring_attribute_rates_of_change(metric_names);
|
||||
RealtimeMonitoringAttribute.update_monitoring_attribute_value("cpu",cpu_value);
|
||||
RealtimeMonitoringAttribute.update_monitoring_attribute_value("ram",ram_value);
|
||||
|
||||
PredictedMonitoringAttribute cpuPredictionAttribute = new PredictedMonitoringAttribute("cpu", 70,1, 90.0,80,10,System.currentTimeMillis(),System.currentTimeMillis()+10000);
|
||||
PredictedMonitoringAttribute ramPredictionAttribute = new PredictedMonitoringAttribute("ram", 50,2, 70.0,80,10,System.currentTimeMillis(),System.currentTimeMillis()+10000);
|
||||
|
||||
|
||||
PredictionAttributeSet predictionAttributeSet = new PredictionAttributeSet(new ArrayList<>(){{add(cpuPredictionAttribute);add(ramPredictionAttribute);}});
|
||||
|
||||
return predictionAttributeSet;
|
||||
}
|
||||
private static PredictionAttributeSet pseudo_initialize() throws Exception {
|
||||
ArrayList<String> metric_names = new ArrayList<>(){{
|
||||
add("cpu");
|
||||
add("ram");
|
||||
add("hard_disk");
|
||||
}};
|
||||
RealtimeMonitoringAttribute.initialize_monitoring_attribute_rates_of_change(metric_names);
|
||||
|
||||
//initial cpu values
|
||||
ArrayList<Double> cpu_values = new ArrayList<>();
|
||||
cpu_values.add(10.0);
|
||||
cpu_values.add(20.0);
|
||||
cpu_values.add(30.0);
|
||||
cpu_values.add(40.0);
|
||||
cpu_values.add(50.0);
|
||||
cpu_values.add(40.0);
|
||||
cpu_values.add(50.0);
|
||||
cpu_values.add(50.0);
|
||||
cpu_values.add(50.0);
|
||||
cpu_values.add(50.0);
|
||||
cpu_values.add(50.0);
|
||||
cpu_values.add(50.0);
|
||||
cpu_values.add(50.0);
|
||||
cpu_values.add(50.0);
|
||||
cpu_values.add(50.0);
|
||||
|
||||
MonitoringAttributeUtilities.initialize_values("cpu",MathUtils.get_average(cpu_values));
|
||||
|
||||
//initial ram values
|
||||
ArrayList<Double> ram_values = new ArrayList<>();
|
||||
ram_values.add(20.0);
|
||||
ram_values.add(20.0);
|
||||
ram_values.add(25.0);
|
||||
ram_values.add(45.0);
|
||||
ram_values.add(30.0);
|
||||
ram_values.add(30.0);
|
||||
ram_values.add(30.0);
|
||||
ram_values.add(30.0);
|
||||
ram_values.add(30.0);
|
||||
ram_values.add(30.0);
|
||||
ram_values.add(40.0);
|
||||
ram_values.add(30.0);
|
||||
ram_values.add(30.0);
|
||||
ram_values.add(30.0);
|
||||
ram_values.add(30.0);
|
||||
ram_values.add(30.0);
|
||||
ram_values.add(30.0);
|
||||
ram_values.add(30.0);
|
||||
ram_values.add(30.0);
|
||||
ram_values.add(30.0);
|
||||
MonitoringAttributeUtilities.initialize_values("ram",MathUtils.get_average(ram_values));
|
||||
|
||||
//Get prediction_attribute_sets and calculate method 1 on top of them
|
||||
//Get individual prediction_attributes and calculate method 2 on top of them
|
||||
|
||||
PredictedMonitoringAttribute cpuPredictionAttribute = new PredictedMonitoringAttribute("cpu", 70,1, 90.0,80,10,System.currentTimeMillis(),System.currentTimeMillis()+10000);
|
||||
PredictedMonitoringAttribute ramPredictionAttribute = new PredictedMonitoringAttribute("ram", 50,2, 70.0,80,10,System.currentTimeMillis(),System.currentTimeMillis()+10000);
|
||||
|
||||
|
||||
PredictionAttributeSet predictionAttributeSet = new PredictionAttributeSet(new ArrayList<>(){{add(cpuPredictionAttribute);add(ramPredictionAttribute);}});
|
||||
|
||||
return predictionAttributeSet;
|
||||
}
|
||||
|
||||
private static ArrayList<AttributeSubscription> initialize_attribute_subscribers(ArrayList<SLORule> rules_list, String broker_ip_address, String broker_username, String broker_password){
|
||||
ArrayList<AttributeSubscription> attribute_subscribers = new ArrayList<>();
|
||||
for (SLORule rule:rules_list){
|
||||
attribute_subscribers.add(new AttributeSubscription(rule,broker_ip_address,broker_username,broker_password));
|
||||
}
|
||||
return attribute_subscribers;
|
||||
}
|
||||
|
||||
public static void initialize_slo_processing(ArrayList<SLORule> rules_list){
|
||||
|
||||
for (SLORule rule:rules_list) {
|
||||
|
||||
Thread severity_calculation_thread = new Thread(() -> {
|
||||
BrokerPublisher persistent_publisher = new BrokerPublisher(topic_for_severity_announcement, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password"), amq_library_configuration_location);
|
||||
|
||||
while (!stop_signal.get()) {
|
||||
/*try {
|
||||
Thread.sleep(time_horizon_seconds*1000);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}*/
|
||||
synchronized (PREDICTION_EXISTS) {
|
||||
while (!PREDICTION_EXISTS.getValue()) {
|
||||
try {
|
||||
PREDICTION_EXISTS.wait();
|
||||
} catch (InterruptedException e) {
|
||||
synchronized (stop_signal) {
|
||||
if (stop_signal.get()) {
|
||||
running_threads.remove("severity_calculation_thread_" + rule.toString());
|
||||
PREDICTION_EXISTS.setValue(false);
|
||||
return;
|
||||
}
|
||||
}
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
try {
|
||||
Clock clock = Clock.systemUTC();
|
||||
Long current_time = clock.millis();
|
||||
Long targeted_prediction_time;
|
||||
synchronized (ADAPTATION_TIMES_MODIFY) {
|
||||
while (!ADAPTATION_TIMES_MODIFY.getValue()) {
|
||||
try {
|
||||
ADAPTATION_TIMES_MODIFY.wait();
|
||||
} catch (InterruptedException e) {
|
||||
Logger.getAnonymousLogger().log(warning_logging_level, "Interrupted while waiting to access the lock for adaptation times object");
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
ADAPTATION_TIMES_MODIFY.setValue(false);
|
||||
clean_data(adaptation_times_to_remove);
|
||||
//targeted_prediction_time = adaptation_times.stream().min(Long::compare).get();
|
||||
targeted_prediction_time = get_next_targeted_prediction_time();
|
||||
ADAPTATION_TIMES_MODIFY.setValue(true);
|
||||
ADAPTATION_TIMES_MODIFY.notifyAll();
|
||||
}
|
||||
if (targeted_prediction_time==null){
|
||||
continue;
|
||||
}
|
||||
Logger.getAnonymousLogger().log(info_logging_level, "Targeted_prediction_time " + targeted_prediction_time);
|
||||
Thread internal_severity_calculation_thread = new Thread(() -> {
|
||||
try {
|
||||
/*
|
||||
synchronized (ADAPTATION_TIMES_MODIFY) {
|
||||
while (!ADAPTATION_TIMES_MODIFY.getValue()) {
|
||||
ADAPTATION_TIMES_MODIFY.wait();
|
||||
}
|
||||
ADAPTATION_TIMES_MODIFY.setValue(false);
|
||||
adaptation_times.remove(targeted_prediction_time);//remove from the list of timepoints which should be processed. Later this timepoint will be added to the adaptation_times_to_remove HashSet to remove any data associated with it
|
||||
ADAPTATION_TIMES_MODIFY.setValue(true);
|
||||
ADAPTATION_TIMES_MODIFY.notifyAll();
|
||||
}
|
||||
//adaptation_times_pending_processing.add(targeted_prediction_time);
|
||||
|
||||
*/
|
||||
synchronized (PREDICTION_EXISTS) {
|
||||
PREDICTION_EXISTS.setValue(adaptation_times.size() > 0);
|
||||
}
|
||||
|
||||
Long sleep_time = targeted_prediction_time * 1000 - time_horizon_seconds * 1000L - current_time;
|
||||
if (sleep_time <= 0) {
|
||||
Logger.getAnonymousLogger().log(info_logging_level, "Prediction cancelled as targeted prediction time was " + targeted_prediction_time * 1000 + " current time is " + current_time + " and the time_horizon is " + time_horizon_seconds * 1000);
|
||||
return; //The predictions are too near to the targeted reconfiguration time (or are even obsolete)
|
||||
} else if (sleep_time > current_time + maximum_acceptable_forward_predictions * time_horizon_seconds * 1000L) {
|
||||
Logger.getAnonymousLogger().log(info_logging_level, "Prediction cancelled as targeted prediction time was " + targeted_prediction_time * 1000 + " and the current time is " + current_time + ". The prediction is more than " + maximum_acceptable_forward_predictions + " time_horizon intervals into the future (the time_horizon is " + time_horizon_seconds * 1000 + " milliseconds)");
|
||||
return; //The predictions are too near to the targeted reconfiguration tim
|
||||
}
|
||||
Logger.getAnonymousLogger().log(info_logging_level, "Sleeping for " + sleep_time + " milliseconds");
|
||||
sleep(sleep_time);
|
||||
double rule_severity = process_rule_value(rule.getRule_representation(), targeted_prediction_time, rule.getRule_format());
|
||||
double slo_violation_probability = determine_slo_violation_probability(rule_severity);
|
||||
Logger.getAnonymousLogger().log(info_logging_level, "The overall " + slo_violation_determination_method + " severity - calculated from real data - for adaptation time " + targeted_prediction_time + " ( " + (new Date((new Timestamp(targeted_prediction_time * 1000)).getTime())) + " ) is " + rule_severity + " and is calculated " + time_horizon_seconds + " seconds beforehand");
|
||||
Logger.getAnonymousLogger().log(info_logging_level, "The probability of an SLO violation is " + ((int) (slo_violation_probability * 100)) + "%" + (slo_violation_probability< slo_violation_probability_threshold ?" so it will not be published":" and it will be published"));
|
||||
|
||||
if (slo_violation_probability>= slo_violation_probability_threshold) {
|
||||
JSONObject severity_json = new JSONObject();
|
||||
severity_json.put("severity", rule_severity);
|
||||
severity_json.put("probability", slo_violation_probability);
|
||||
severity_json.put("predictionTime", targeted_prediction_time);
|
||||
persistent_publisher.publish(severity_json.toJSONString());
|
||||
}
|
||||
|
||||
slo_violation_event_recording_queue.add(System.currentTimeMillis());
|
||||
|
||||
//Probably not necessary to synchronize the line below as each removal will happen only once in a reconfiguration interval, and reconfiguration intervals are assumed to have a duration of minutes.
|
||||
//Necessary to synchronize because another severity calculation thread might invoke clean_data above, and then a concurrent modification exception may arise
|
||||
synchronized (ADAPTATION_TIMES_MODIFY){
|
||||
while (!ADAPTATION_TIMES_MODIFY.getValue()){
|
||||
ADAPTATION_TIMES_MODIFY.wait();
|
||||
}
|
||||
ADAPTATION_TIMES_MODIFY.setValue(false);
|
||||
adaptation_times_to_remove.add(targeted_prediction_time); //This line serves a different purpose from the adaptation_times.remove(...) directive above, as the adaptation_times_to_remove HashSet contains timepoints which should be processed to delete their data.
|
||||
adaptation_times_pending_processing.remove(targeted_prediction_time);
|
||||
ADAPTATION_TIMES_MODIFY.setValue(true);
|
||||
ADAPTATION_TIMES_MODIFY.notifyAll();
|
||||
}
|
||||
} catch (InterruptedException i) {
|
||||
Logger.getAnonymousLogger().log(severe_logging_level, "Severity calculation thread for epoch time " + targeted_prediction_time + " interrupted, stopping...");
|
||||
return;
|
||||
}
|
||||
});
|
||||
internal_severity_calculation_thread.setName("internal_severity_calculation_thread_" + targeted_prediction_time);
|
||||
internal_severity_calculation_thread.start();
|
||||
} catch (NoSuchElementException n) {
|
||||
Logger.getAnonymousLogger().log(warning_logging_level, "Could not calculate severity as a value was missing...");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
running_threads.remove("severity_calculation_thread_"+rule.toString());
|
||||
});
|
||||
String severity_calculation_thread_name = "severity_calculation_thread_"+rule.toString();
|
||||
severity_calculation_thread.setName(severity_calculation_thread_name);
|
||||
severity_calculation_thread.start();
|
||||
running_threads.put(severity_calculation_thread_name,severity_calculation_thread);
|
||||
}/*
|
||||
while (true){
|
||||
|
||||
}*/
|
||||
|
||||
//return slo_processors;
|
||||
}
|
||||
|
||||
private static Long get_next_targeted_prediction_time() {
|
||||
List<Long> possible_targeted_prediction_times = adaptation_times.stream().sorted().limit(maximum_acceptable_forward_predictions).collect(Collectors.toList());
|
||||
for (int i=0; i<possible_targeted_prediction_times.size(); i++){
|
||||
Long possible_targeted_adaptation_time = possible_targeted_prediction_times.get(i);
|
||||
if (!adaptation_times_pending_processing.contains(possible_targeted_adaptation_time)){
|
||||
adaptation_times.remove(possible_targeted_adaptation_time);
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Removing targeted prediction time "+possible_targeted_adaptation_time+" as it is going to be used");
|
||||
adaptation_times_pending_processing.add(possible_targeted_adaptation_time);
|
||||
return possible_targeted_adaptation_time;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private static void clean_data(HashSet<Long> adaptation_times_to_remove) {
|
||||
for (Long processed_adaptation_time:adaptation_times_to_remove){
|
||||
if (processed_adaptation_time>last_processed_adaptation_time){
|
||||
last_processed_adaptation_time = processed_adaptation_time;
|
||||
}
|
||||
synchronized (can_modify_slo_rules) {
|
||||
while (!can_modify_slo_rules.getValue()) {
|
||||
try {
|
||||
can_modify_slo_rules.wait();
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
can_modify_slo_rules.setValue(false);
|
||||
for (SLORule slo_rule : slo_rules) {
|
||||
for (SLOSubRule subrule : slo_rule.getSlo_subrules()) {
|
||||
if (getPredicted_monitoring_attributes().containsKey(subrule.getId())) {
|
||||
getPredicted_monitoring_attributes().get(subrule.getId()).remove(processed_adaptation_time);
|
||||
}
|
||||
}
|
||||
}
|
||||
can_modify_slo_rules.setValue(true);
|
||||
can_modify_slo_rules.notifyAll();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This function determines the probability of an SLO violation
|
||||
* @param rule_severity The severity of the rule which has been determined
|
||||
* @return The probability of the rule being violated. The minimum value of this probability is 0, and increases as the severity increases
|
||||
*/
|
||||
public static double determine_slo_violation_probability(double rule_severity) {
|
||||
if (slo_violation_determination_method.equals("all-metrics")) {
|
||||
//39.64 is the mean severity value when examining all integer severity values for roc x probability x confidence_interval x delta_value in (-100,100)x(0,100)x(0,100)x(-100,100)
|
||||
/*
|
||||
if (rule_severity >= 40) {
|
||||
return Math.min((50 + 50*(rule_severity - 40) / 60)/100,1); // in case we desire the probability to start from 50%
|
||||
// return Math.min((100*(rule_severity - 40) / 60)/100,1); // in case we desire the probability to start from 0%
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
|
||||
*/
|
||||
return Math.min(rule_severity/100,100);
|
||||
}else if (slo_violation_determination_method.equals("prconf-delta")){
|
||||
//Logger.getAnonymousLogger().log(warning_logging_level,"The calculation of probability for the prconf-delta method needs to be implemented");
|
||||
//return 0;
|
||||
if (rule_severity >= 6.52){
|
||||
return Math.min((50+50*(rule_severity-6.52)/93.48)/100,1);
|
||||
}else{
|
||||
return 0;
|
||||
}
|
||||
|
||||
}else{
|
||||
Logger.getAnonymousLogger().log(warning_logging_level,"Unknown severity calculation method");
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
public static AtomicBoolean getStop_signal() {
|
||||
return stop_signal;
|
||||
}
|
||||
|
||||
}
|
@ -10,59 +10,42 @@ package runtime;
|
||||
|
||||
import eu.melodic.event.brokerclient.BrokerPublisher;
|
||||
import eu.melodic.event.brokerclient.BrokerSubscriber;
|
||||
import metric_retrieval.AttributeSubscription;
|
||||
import org.apache.commons.collections4.queue.CircularFifoQueue;
|
||||
import org.json.simple.JSONArray;
|
||||
import org.json.simple.JSONObject;
|
||||
import org.json.simple.parser.JSONParser;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
import slo_processing.SLORule;
|
||||
import slo_processing.SLOSubRule;
|
||||
import utilities.DebugDataSubscription;
|
||||
import utilities.MathUtils;
|
||||
import utility_beans.*;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
import java.sql.Timestamp;
|
||||
import java.time.Clock;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static configuration.Constants.*;
|
||||
import static java.lang.Thread.sleep;
|
||||
import static slo_processing.SLORule.process_rule_value;
|
||||
import static utility_beans.PredictedMonitoringAttribute.getPredicted_monitoring_attributes;
|
||||
import static processing_logic.Runnables.slo_detector_mode_runnable;
|
||||
import static utilities.OperationalModeUtils.getSLOViolationDetectionOperationalMode;
|
||||
import static utilities.SLOViolationDetectorUtils.*;
|
||||
import static utilities.SLOViolationDetectorStateUtils.*;
|
||||
import static utility_beans.CharacterizedThread.CharacterizedThreadRunMode.attached;
|
||||
import static utility_beans.CharacterizedThread.CharacterizedThreadRunMode.detached;
|
||||
import static utility_beans.CharacterizedThread.CharacterizedThreadType.persistent_running_thread;
|
||||
import static utility_beans.RealtimeMonitoringAttribute.getMonitoring_attributes_bounds_representation;
|
||||
|
||||
@RestController
|
||||
@SpringBootApplication
|
||||
public class Main {
|
||||
public static final AtomicBoolean stop_signal = new AtomicBoolean(false);
|
||||
public static final SynchronizedBoolean PREDICTION_EXISTS = new SynchronizedBoolean(false);
|
||||
public static final SynchronizedBoolean ADAPTATION_TIMES_MODIFY = new SynchronizedBoolean(true);
|
||||
public static SynchronizedBooleanMap HAS_MESSAGE_ARRIVED = new SynchronizedBooleanMap();
|
||||
public static SynchronizedStringMap MESSAGE_CONTENTS = new SynchronizedStringMap();
|
||||
public static ArrayList<SLORule> slo_rules = new ArrayList<>();
|
||||
public static HashMap<String,Thread> running_threads = new HashMap<>();
|
||||
public static HashSet<Long> adaptation_times = new HashSet<>();
|
||||
public static HashSet<Long> adaptation_times_pending_processing = new HashSet<>();
|
||||
private static HashSet<Long> adaptation_times_to_remove = new HashSet<>();
|
||||
public static Long last_processed_adaptation_time = -1L;//initialization
|
||||
public static Long current_slo_rules_version = -1L;//initialization
|
||||
public static final AtomicBoolean slo_rule_arrived = new AtomicBoolean(false);
|
||||
public static final SynchronizedBoolean can_modify_slo_rules = new SynchronizedBoolean(false);
|
||||
|
||||
//Debugging variables
|
||||
public static CircularFifoQueue<Long> slo_violation_event_recording_queue = new CircularFifoQueue<>(50);
|
||||
public static CircularFifoQueue<String> severity_calculation_event_recording_queue = new CircularFifoQueue<>(50);
|
||||
private static Properties prop = new Properties();
|
||||
|
||||
public static boolean EXECUTION_NOT_TERMINATED = true;
|
||||
public static void main(String[] args) {
|
||||
|
||||
//The input of this program is the type of the SLO violations which are monitored, and the predicted metric values which are evaluated. Its output are events which produce an estimate of the probability of an adaptation.
|
||||
@ -74,645 +57,190 @@ public class Main {
|
||||
// - The prediction confidence
|
||||
|
||||
try {
|
||||
InputStream inputStream = null;
|
||||
if (args.length == 0) {
|
||||
base_project_path = new File("").toURI();
|
||||
URI absolute_configuration_file_path = new File(configuration_file_location).toURI();
|
||||
URI relative_configuration_file_path = base_project_path.relativize(absolute_configuration_file_path);
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"This is the base project path:"+base_project_path);
|
||||
inputStream = new FileInputStream(base_project_path.getPath()+relative_configuration_file_path);
|
||||
} else {
|
||||
Logger.getAnonymousLogger().log(info_logging_level, "A preferences file has been manually specified");
|
||||
|
||||
if (base_project_path == null || base_project_path.getPath().equals(EMPTY)) {
|
||||
base_project_path = new File(args[0]).toURI();
|
||||
}
|
||||
inputStream = new FileInputStream(base_project_path.getPath());
|
||||
}
|
||||
prop.load(inputStream);
|
||||
String slo_rules_topic = prop.getProperty("slo_rules_topic");
|
||||
kept_values_per_metric = Integer.parseInt(prop.getProperty("stored_values_per_metric","5"));
|
||||
self_publish_rule_file = Boolean.parseBoolean(prop.getProperty("self_publish_rule_file"));
|
||||
single_slo_rule_active = Boolean.parseBoolean(prop.getProperty("single_slo_rule_active"));
|
||||
time_horizon_seconds = Integer.parseInt(prop.getProperty("time_horizon_seconds"));
|
||||
|
||||
slo_violation_probability_threshold = Double.parseDouble(prop.getProperty("slo_violation_probability_threshold"));
|
||||
slo_violation_determination_method = prop.getProperty("slo_violation_determination_method");
|
||||
maximum_acceptable_forward_predictions = Integer.parseInt(prop.getProperty("maximum_acceptable_forward_predictions"));
|
||||
ArrayList<String> unbounded_metric_strings = new ArrayList<String>(Arrays.asList(prop.getProperty("metrics_bounds").split(",")));
|
||||
for (String metric_string : unbounded_metric_strings){
|
||||
getMonitoring_attributes_bounds_representation().put(metric_string.split(";")[0], metric_string.split(";",2)[1]);
|
||||
}
|
||||
|
||||
while (true) {
|
||||
|
||||
if (first_run && USE_CONFIGURATION_FILE_FOR_METRIC_VALUES_INPUT) {
|
||||
|
||||
String json_file_name = prop.getProperty("input_file");
|
||||
slo_violation_determination_method = prop.getProperty("slo_violation_determination_method");
|
||||
confidence_interval = Double.parseDouble(prop.getProperty("confidence_interval"));
|
||||
prediction_certainty = Double.parseDouble(prop.getProperty("prediction_certainty"));
|
||||
|
||||
ArrayList<String> metric_names = new ArrayList<>() {{
|
||||
add("cpu");
|
||||
add("ram");
|
||||
add("bandwidth");
|
||||
add("disk");
|
||||
}};
|
||||
HashMap<String, Double> input_data = new HashMap<>();
|
||||
for (String metric_name : metric_names) {
|
||||
|
||||
Double metric_input_data = MathUtils.get_average(new ArrayList<>(Arrays.stream(prop.getProperty(metric_name).split(",")).map(Double::parseDouble).collect(Collectors.toList())));
|
||||
|
||||
input_data.put(metric_name, metric_input_data);
|
||||
}
|
||||
|
||||
|
||||
RealtimeMonitoringAttribute.initialize_monitoring_attribute_rates_of_change(metric_names);
|
||||
RealtimeMonitoringAttribute.simple_initialize_0_100_bounded_attributes(metric_names);
|
||||
|
||||
RealtimeMonitoringAttribute.update_monitoring_attributes_values_map(input_data);
|
||||
|
||||
//Parsing of file
|
||||
String rules_json_string = String.join(EMPTY, Files.readAllLines(Paths.get(new File(json_file_name).getAbsolutePath())));
|
||||
Logger.getAnonymousLogger().log(info_logging_level, rules_json_string);
|
||||
Main.MESSAGE_CONTENTS.assign_value(slo_rules_topic, rules_json_string);
|
||||
slo_rules.add(new SLORule(Main.MESSAGE_CONTENTS.get_synchronized_contents(slo_rules_topic), new ArrayList<>(Arrays.asList(prop.getProperty("metrics_list").split(",")))));
|
||||
|
||||
{
|
||||
InputStream inputStream = null;
|
||||
if (args.length == 0) {
|
||||
operational_mode = getSLOViolationDetectionOperationalMode("DIRECTOR");
|
||||
inputStream = getPreferencesFileInputStream(EMPTY);
|
||||
} else if (args.length == 1) {
|
||||
Logger.getAnonymousLogger().log(info_logging_level, "Operational mode has been manually specified");
|
||||
operational_mode = getSLOViolationDetectionOperationalMode(args[0]);
|
||||
inputStream = getPreferencesFileInputStream(EMPTY);
|
||||
} else {
|
||||
if (first_run && USE_CONFIGURATION_FILE_FOR_METRIC_TOPICS_INPUT) {
|
||||
synchronized (can_modify_slo_rules) {
|
||||
//do {
|
||||
// can_modify_slo_rules.wait();
|
||||
//}while(!can_modify_slo_rules.getValue());
|
||||
can_modify_slo_rules.setValue(false);
|
||||
|
||||
slo_rules.add(new SLORule(Main.MESSAGE_CONTENTS.get_synchronized_contents(slo_rules_topic), new ArrayList<>(Arrays.asList(prop.getProperty("metrics_list").split(",")))));
|
||||
can_modify_slo_rules.setValue(true);
|
||||
slo_rule_arrived.set(true);
|
||||
can_modify_slo_rules.notifyAll();
|
||||
}
|
||||
} else if (first_run){
|
||||
|
||||
BiFunction<String, String, String> function = (topic, message) -> {
|
||||
synchronized (can_modify_slo_rules) {
|
||||
can_modify_slo_rules.setValue(true);
|
||||
Main.MESSAGE_CONTENTS.assign_value(topic, message);
|
||||
slo_rule_arrived.set(true);
|
||||
can_modify_slo_rules.notifyAll();
|
||||
|
||||
Logger.getAnonymousLogger().log(info_logging_level, "BrokerClientApp: - Received text message: " + message + " at topic " + topic);
|
||||
|
||||
}
|
||||
return topic + ":MSG:" + message;
|
||||
};
|
||||
|
||||
BrokerSubscriber subscriber = new BrokerSubscriber(slo_rules_topic, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password"), amq_library_configuration_location);
|
||||
new Thread(() -> {
|
||||
while (true) {
|
||||
subscriber.subscribe(function, new AtomicBoolean(false)); //This subscriber should be immune to stop signals
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Broker unavailable, will try to reconnect after 10 seconds");
|
||||
try {
|
||||
Thread.sleep(10000);
|
||||
}catch (InterruptedException i){
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Sleep was interrupted, will immediately try to connect to the broker");
|
||||
}
|
||||
}
|
||||
}).start();
|
||||
|
||||
if (self_publish_rule_file) {
|
||||
String json_file_name = prop.getProperty("input_file");
|
||||
String rules_json_string = String.join(EMPTY, Files.readAllLines(Paths.get(new File(json_file_name).getAbsolutePath())));
|
||||
BrokerPublisher publisher = new BrokerPublisher(slo_rules_topic, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password"), amq_library_configuration_location);
|
||||
publisher.publish(rules_json_string);
|
||||
Logger.getAnonymousLogger().log(info_logging_level, "Sent message\n" + rules_json_string);
|
||||
}
|
||||
}
|
||||
}
|
||||
first_run = false;
|
||||
synchronized (can_modify_slo_rules) {
|
||||
do {
|
||||
try {
|
||||
can_modify_slo_rules.wait();
|
||||
}catch (InterruptedException i){
|
||||
i.printStackTrace();
|
||||
}
|
||||
}while((!can_modify_slo_rules.getValue()) || (!slo_rule_arrived.get()));
|
||||
can_modify_slo_rules.setValue(false);
|
||||
slo_rule_arrived.set(false);
|
||||
String rule_representation = MESSAGE_CONTENTS.get_synchronized_contents(slo_rules_topic);
|
||||
if (slo_rule_arrived_has_updated_version(rule_representation)) {
|
||||
if (single_slo_rule_active) {
|
||||
slo_rules.clear();
|
||||
}
|
||||
|
||||
ArrayList<String> additional_metrics_from_new_slo = get_metric_list_from_JSON_slo(rule_representation);
|
||||
|
||||
if (additional_metrics_from_new_slo.size() > 0) {
|
||||
slo_rules.add(new SLORule(rule_representation, additional_metrics_from_new_slo));
|
||||
}
|
||||
can_modify_slo_rules.setValue(true);
|
||||
can_modify_slo_rules.notifyAll();
|
||||
}else{
|
||||
can_modify_slo_rules.setValue(true);
|
||||
can_modify_slo_rules.notifyAll();
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
Logger.getAnonymousLogger().log(info_logging_level, "Operational mode and preferences file has been manually specified");
|
||||
operational_mode = getSLOViolationDetectionOperationalMode(args[0]);
|
||||
inputStream = getPreferencesFileInputStream(args[1]);
|
||||
|
||||
}
|
||||
prop.load(inputStream);
|
||||
slo_rules_topic = prop.getProperty("slo_rules_topic");
|
||||
kept_values_per_metric = Integer.parseInt(prop.getProperty("stored_values_per_metric", "5"));
|
||||
self_publish_rule_file = Boolean.parseBoolean(prop.getProperty("self_publish_rule_file"));
|
||||
single_slo_rule_active = Boolean.parseBoolean(prop.getProperty("single_slo_rule_active"));
|
||||
time_horizon_seconds = Integer.parseInt(prop.getProperty("time_horizon_seconds"));
|
||||
|
||||
stop_all_running_threads();
|
||||
DebugDataSubscription.initiate(prop.getProperty("broker_ip_url"),prop.getProperty("broker_username"), prop.getProperty("broker_password"));
|
||||
initialize_monitoring_datastructures_with_empty_data(slo_rules);
|
||||
//
|
||||
initialize_subrule_and_attribute_associations(slo_rules);
|
||||
initialize_attribute_subscribers(slo_rules, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password"));
|
||||
initialize_slo_processing(slo_rules);
|
||||
|
||||
//while (true) {
|
||||
|
||||
//}
|
||||
slo_violation_probability_threshold = Double.parseDouble(prop.getProperty("slo_violation_probability_threshold"));
|
||||
slo_violation_determination_method = prop.getProperty("slo_violation_determination_method");
|
||||
maximum_acceptable_forward_predictions = Integer.parseInt(prop.getProperty("maximum_acceptable_forward_predictions"));
|
||||
ArrayList<String> unbounded_metric_strings = new ArrayList<String>(Arrays.asList(prop.getProperty("metrics_bounds").split(",")));
|
||||
for (String metric_string : unbounded_metric_strings) {
|
||||
getMonitoring_attributes_bounds_representation().put(metric_string.split(";")[0], metric_string.split(";", 2)[1]);
|
||||
}
|
||||
} //initialization
|
||||
if (operational_mode.equals(OperationalMode.DETECTOR)) {
|
||||
start_new_slo_violation_detector_subcomponent(attached);
|
||||
}else if (operational_mode.equals(OperationalMode.DIRECTOR)){
|
||||
/*
|
||||
while (EXECUTION_NOT_TERMINATED) {
|
||||
synchronized (create_new_slo_detector) {
|
||||
create_new_slo_detector.wait();
|
||||
for (int i=1; i<=create_new_slo_detector.getValue(); i++) {
|
||||
CharacterizedThread.create_new_thread(slo_detector_mode_runnable, "slo_detector_master_thread", persistent_running_thread, true);
|
||||
}
|
||||
create_new_slo_detector.setValue(0);
|
||||
}
|
||||
}
|
||||
*/
|
||||
SpringApplication.run(Main.class, args);
|
||||
System.out.println("TEST");
|
||||
//TODO start a new DETECTOR instance when a relevant message appears
|
||||
}
|
||||
}catch (Exception e){
|
||||
}catch (IOException e){
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Problem reading input file");
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean slo_rule_arrived_has_updated_version(String rule_representation) {
|
||||
JSONObject json_object = null;
|
||||
long json_object_version = Integer.MAX_VALUE;
|
||||
try {
|
||||
json_object = (JSONObject) new JSONParser().parse(rule_representation);
|
||||
json_object_version = (Long) json_object.get("version");
|
||||
} catch (NullPointerException n){
|
||||
n.printStackTrace();
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Unfortunately a null message was sent to the SLO Violation Detector, which is being ignored");
|
||||
return false;
|
||||
} catch (Exception e){
|
||||
}catch (Exception e){
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Miscellaneous issue during startup");
|
||||
e.printStackTrace();
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Could not parse the JSON of the new SLO, assuming it is not an updated rule...");
|
||||
return false;
|
||||
}
|
||||
if (json_object_version > current_slo_rules_version){
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"An SLO with updated version ("+json_object_version+" vs older "+current_slo_rules_version+") has arrived");
|
||||
current_slo_rules_version=json_object_version;
|
||||
return true;
|
||||
}else {
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Taking no action for the received SLO message as the version number is not updated");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private static void stop_all_running_threads() {
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Asking previously existing threads to terminate");
|
||||
int initial_number_of_running_threads = running_threads.size();
|
||||
while (running_threads.size()>0) {
|
||||
synchronized (stop_signal) {
|
||||
stop_signal.set(true);
|
||||
stop_signal.notifyAll();
|
||||
}
|
||||
try {
|
||||
Thread.sleep(3000);
|
||||
running_threads.values().forEach(Thread::interrupt);
|
||||
}catch (Exception e){
|
||||
}
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Stopped "+(initial_number_of_running_threads-running_threads.size())+"/"+initial_number_of_running_threads+" already running threads");
|
||||
if (running_threads.size()>1){
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"The threads which are still running are the following: "+running_threads);
|
||||
}else if (running_threads.size()>0){
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"The thread which is still running is the following: "+running_threads);
|
||||
}
|
||||
}
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"All threads have terminated");
|
||||
synchronized (stop_signal) {
|
||||
stop_signal.set(false);
|
||||
}
|
||||
synchronized (PREDICTION_EXISTS){
|
||||
PREDICTION_EXISTS.setValue(false);
|
||||
}
|
||||
adaptation_times.clear();
|
||||
@RequestMapping("/add-new-detector")
|
||||
public static String start_new_slo_violation_detector_subcomponent() throws IOException {
|
||||
start_new_slo_violation_detector_subcomponent(detached);
|
||||
return ("Spawned new SLO Detector subcomponent instance!");
|
||||
}
|
||||
|
||||
public static void initialize_subrule_and_attribute_associations(ArrayList<SLORule> slo_rules) {
|
||||
synchronized (can_modify_slo_rules) {
|
||||
while (!can_modify_slo_rules.getValue()){
|
||||
try {
|
||||
can_modify_slo_rules.wait();
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
can_modify_slo_rules.setValue(false);
|
||||
for (SLORule slo_rule : slo_rules) {
|
||||
for (SLOSubRule subrule : SLORule.parse_subrules(slo_rule.getRule_representation(),slo_rule.getRule_format())) {
|
||||
SLOSubRule.getSlo_subrules_per_monitoring_attribute().computeIfAbsent(subrule.getMetric(), k -> new ArrayList<>());
|
||||
SLOSubRule.getSlo_subrules_per_monitoring_attribute().get(subrule.getMetric()).add(subrule);
|
||||
}
|
||||
}
|
||||
can_modify_slo_rules.setValue(true);
|
||||
can_modify_slo_rules.notifyAll();
|
||||
public static void start_new_slo_violation_detector_subcomponent(CharacterizedThread.CharacterizedThreadRunMode characterized_thread_run_mode) throws IOException {
|
||||
if (characterized_thread_run_mode.equals(attached)) {
|
||||
run_slo_violation_detection_engine();
|
||||
}else/*detached mode*/{
|
||||
CharacterizedThread.create_new_thread(slo_detector_mode_runnable, "slo_detector_master_thread", persistent_running_thread, true);
|
||||
}
|
||||
}
|
||||
public static void run_slo_violation_detection_engine() throws IOException {
|
||||
while (true) {
|
||||
if (first_run){
|
||||
//Creation of threads that should always run and are independent of the monitored application.
|
||||
//1. Creation of the slo rule input subscriber thread, which listens for new slo rules to be considered
|
||||
//2. Creation of the lost device subscriber thread, which listens for a new event signalling a lost edge device
|
||||
|
||||
BiFunction<String, String, String> slo_rule_topic_subscriber_function = (topic, message) -> {
|
||||
synchronized (can_modify_slo_rules) {
|
||||
can_modify_slo_rules.setValue(true);
|
||||
MESSAGE_CONTENTS.assign_value(topic, message);
|
||||
slo_rule_arrived.set(true);
|
||||
can_modify_slo_rules.notifyAll();
|
||||
|
||||
private static void initialize_monitoring_datastructures_with_empty_data(ArrayList<SLORule> slo_rules){
|
||||
for(SLORule slo_rule: slo_rules){
|
||||
for (String metric_name : slo_rule.get_monitoring_attributes()) {
|
||||
MonitoringAttributeUtilities.initialize_values(metric_name);
|
||||
}
|
||||
}
|
||||
}
|
||||
Logger.getAnonymousLogger().log(info_logging_level, "BrokerClientApp: - Received text message: " + message + " at topic " + topic);
|
||||
|
||||
public static ArrayList<String> get_metric_list_from_JSON_slo(String json_object_string) {
|
||||
HashSet<String> metric_list = new HashSet<>();
|
||||
try {
|
||||
JSONObject json_object = (JSONObject) new JSONParser().parse(json_object_string);
|
||||
String json_object_id = (String) json_object.get("id");
|
||||
String json_object_name = (String) json_object.get("name");
|
||||
//Older format uses id-based fields, newer format uses a non-variable structure
|
||||
//We first check if an event using the older format is sent, and then check if the event is sent using the newer format
|
||||
if (json_object_id!=null) {
|
||||
if (json_object_id.split("-").length > 1) {
|
||||
//String composite_rule_type = json_object_id.split("-")[0];
|
||||
JSONArray internal_json_slos = (JSONArray) json_object.get(json_object_id);
|
||||
for (Object o : internal_json_slos) {
|
||||
JSONObject internal_json_slo = (JSONObject) o;
|
||||
metric_list.addAll(get_metric_list_from_JSON_slo(internal_json_slo.toJSONString()));
|
||||
}
|
||||
} else {
|
||||
metric_list.add((String) json_object.get("attribute"));
|
||||
}
|
||||
}
|
||||
//If newer format is used
|
||||
else if (json_object_name!=null){
|
||||
JSONArray internal_json_slos = (JSONArray) json_object.get("constraints");
|
||||
if ((internal_json_slos!=null) && (internal_json_slos.size()>0)){
|
||||
for (Object o : internal_json_slos) {
|
||||
JSONObject internal_json_slo = (JSONObject) o;
|
||||
metric_list.addAll(get_metric_list_from_JSON_slo(internal_json_slo.toJSONString()));
|
||||
}
|
||||
}else{
|
||||
metric_list.add((String) json_object.get("metric"));
|
||||
}
|
||||
}else{
|
||||
Logger.getAnonymousLogger().log(Level.INFO,"An SLO rule was sent in a format which could not be fully parsed, therefore ignoring this rule. The non-understandable part of the SLO rule is printed below"+"\n"+json_object_string);
|
||||
}
|
||||
}catch (Exception p){
|
||||
p.printStackTrace();
|
||||
return new ArrayList<String>();
|
||||
}
|
||||
return new ArrayList<String>(metric_list);
|
||||
}
|
||||
return topic + ":MSG:" + message;
|
||||
};
|
||||
|
||||
private static double get_metric_value_from_JSON(String data_arrived) {
|
||||
double result = -1;
|
||||
|
||||
JSONObject json_data_representation = render_valid_json(data_arrived,"=");
|
||||
//JSONObject json_data_representation = (JSONObject) new JSONParser().parse(data_arrived);
|
||||
result = Double.parseDouble((String)json_data_representation.get("metricValue"));
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method replaces any invalid characters in the json which is received from the broker, and creates a valid JSON object
|
||||
* @param data_arrived The data which is received from the broker
|
||||
* @param string_to_replace The invalid character which should be substituted by an 'equals' sign
|
||||
* @return A JSON object
|
||||
*/
|
||||
private static JSONObject render_valid_json(String data_arrived, String string_to_replace) {
|
||||
String valid_json_string = new String(data_arrived);
|
||||
JSONObject json_object = new JSONObject();
|
||||
valid_json_string = valid_json_string.replaceAll(string_to_replace,":");
|
||||
|
||||
valid_json_string = valid_json_string.replaceAll("[{}]","");
|
||||
|
||||
String [] json_elements = valid_json_string.split(",");
|
||||
List <String> json_elements_list = Arrays.stream(json_elements).map(String::trim).collect(Collectors.toList());
|
||||
|
||||
for (String element: json_elements_list) {
|
||||
json_object.put(element.split(":")[0],element.split(":")[1]);
|
||||
}
|
||||
|
||||
return json_object;
|
||||
}
|
||||
|
||||
|
||||
private static void initialize_global_prediction_attribute_data(){
|
||||
Logger.getAnonymousLogger().log(warning_logging_level,"Get global prediction attribute data needs implementation");
|
||||
}
|
||||
|
||||
|
||||
private static PredictionAttributeSet get_prediction_attribute_set(ArrayList<SLORule> rules){
|
||||
//usedglobalHashmap: attribute_data,
|
||||
return null;
|
||||
}
|
||||
|
||||
private static PredictionAttributeSet initialize_with_existing_values(Double cpu_value, Double ram_value) {
|
||||
ArrayList<String> metric_names = new ArrayList<>(){{
|
||||
add("cpu");
|
||||
add("ram");
|
||||
add("hard_disk");
|
||||
}};
|
||||
RealtimeMonitoringAttribute.initialize_monitoring_attribute_rates_of_change(metric_names);
|
||||
RealtimeMonitoringAttribute.update_monitoring_attribute_value("cpu",cpu_value);
|
||||
RealtimeMonitoringAttribute.update_monitoring_attribute_value("ram",ram_value);
|
||||
|
||||
PredictedMonitoringAttribute cpuPredictionAttribute = new PredictedMonitoringAttribute("cpu", 70,1, 90.0,80,10,System.currentTimeMillis(),System.currentTimeMillis()+10000);
|
||||
PredictedMonitoringAttribute ramPredictionAttribute = new PredictedMonitoringAttribute("ram", 50,2, 70.0,80,10,System.currentTimeMillis(),System.currentTimeMillis()+10000);
|
||||
|
||||
|
||||
PredictionAttributeSet predictionAttributeSet = new PredictionAttributeSet(new ArrayList<>(){{add(cpuPredictionAttribute);add(ramPredictionAttribute);}});
|
||||
|
||||
return predictionAttributeSet;
|
||||
}
|
||||
private static PredictionAttributeSet pseudo_initialize() throws Exception {
|
||||
ArrayList<String> metric_names = new ArrayList<>(){{
|
||||
add("cpu");
|
||||
add("ram");
|
||||
add("hard_disk");
|
||||
}};
|
||||
RealtimeMonitoringAttribute.initialize_monitoring_attribute_rates_of_change(metric_names);
|
||||
|
||||
//initial cpu values
|
||||
ArrayList<Double> cpu_values = new ArrayList<>();
|
||||
cpu_values.add(10.0);
|
||||
cpu_values.add(20.0);
|
||||
cpu_values.add(30.0);
|
||||
cpu_values.add(40.0);
|
||||
cpu_values.add(50.0);
|
||||
cpu_values.add(40.0);
|
||||
cpu_values.add(50.0);
|
||||
cpu_values.add(50.0);
|
||||
cpu_values.add(50.0);
|
||||
cpu_values.add(50.0);
|
||||
cpu_values.add(50.0);
|
||||
cpu_values.add(50.0);
|
||||
cpu_values.add(50.0);
|
||||
cpu_values.add(50.0);
|
||||
cpu_values.add(50.0);
|
||||
|
||||
MonitoringAttributeUtilities.initialize_values("cpu",MathUtils.get_average(cpu_values));
|
||||
|
||||
//initial ram values
|
||||
ArrayList<Double> ram_values = new ArrayList<>();
|
||||
ram_values.add(20.0);
|
||||
ram_values.add(20.0);
|
||||
ram_values.add(25.0);
|
||||
ram_values.add(45.0);
|
||||
ram_values.add(30.0);
|
||||
ram_values.add(30.0);
|
||||
ram_values.add(30.0);
|
||||
ram_values.add(30.0);
|
||||
ram_values.add(30.0);
|
||||
ram_values.add(30.0);
|
||||
ram_values.add(40.0);
|
||||
ram_values.add(30.0);
|
||||
ram_values.add(30.0);
|
||||
ram_values.add(30.0);
|
||||
ram_values.add(30.0);
|
||||
ram_values.add(30.0);
|
||||
ram_values.add(30.0);
|
||||
ram_values.add(30.0);
|
||||
ram_values.add(30.0);
|
||||
ram_values.add(30.0);
|
||||
MonitoringAttributeUtilities.initialize_values("ram",MathUtils.get_average(ram_values));
|
||||
|
||||
//Get prediction_attribute_sets and calculate method 1 on top of them
|
||||
//Get individual prediction_attributes and calculate method 2 on top of them
|
||||
|
||||
PredictedMonitoringAttribute cpuPredictionAttribute = new PredictedMonitoringAttribute("cpu", 70,1, 90.0,80,10,System.currentTimeMillis(),System.currentTimeMillis()+10000);
|
||||
PredictedMonitoringAttribute ramPredictionAttribute = new PredictedMonitoringAttribute("ram", 50,2, 70.0,80,10,System.currentTimeMillis(),System.currentTimeMillis()+10000);
|
||||
|
||||
|
||||
PredictionAttributeSet predictionAttributeSet = new PredictionAttributeSet(new ArrayList<>(){{add(cpuPredictionAttribute);add(ramPredictionAttribute);}});
|
||||
|
||||
return predictionAttributeSet;
|
||||
}
|
||||
|
||||
private static ArrayList<AttributeSubscription> initialize_attribute_subscribers(ArrayList<SLORule> rules_list, String broker_ip_address, String broker_username, String broker_password){
|
||||
ArrayList<AttributeSubscription> attribute_subscribers = new ArrayList<>();
|
||||
for (SLORule rule:rules_list){
|
||||
attribute_subscribers.add(new AttributeSubscription(rule,broker_ip_address,broker_username,broker_password));
|
||||
}
|
||||
return attribute_subscribers;
|
||||
}
|
||||
|
||||
public static void initialize_slo_processing(ArrayList<SLORule> rules_list){
|
||||
|
||||
for (SLORule rule:rules_list) {
|
||||
|
||||
Thread severity_calculation_thread = new Thread(() -> {
|
||||
BrokerPublisher persistent_publisher = new BrokerPublisher(topic_for_severity_announcement, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password"), amq_library_configuration_location);
|
||||
|
||||
while (!stop_signal.get()) {
|
||||
/*try {
|
||||
Thread.sleep(time_horizon_seconds*1000);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}*/
|
||||
synchronized (PREDICTION_EXISTS) {
|
||||
while (!PREDICTION_EXISTS.getValue()) {
|
||||
try {
|
||||
PREDICTION_EXISTS.wait();
|
||||
} catch (InterruptedException e) {
|
||||
synchronized (stop_signal) {
|
||||
if (stop_signal.get()) {
|
||||
running_threads.remove("severity_calculation_thread_" + rule.toString());
|
||||
PREDICTION_EXISTS.setValue(false);
|
||||
return;
|
||||
}
|
||||
}
|
||||
e.printStackTrace();
|
||||
}
|
||||
BrokerSubscriber slo_rule_topic_subscriber = new BrokerSubscriber(slo_rules_topic, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password"), amq_library_configuration_location);
|
||||
Runnable slo_rules_topic_subscriber_runnable = () -> {
|
||||
while (true) {
|
||||
slo_rule_topic_subscriber.subscribe(slo_rule_topic_subscriber_function, new AtomicBoolean(false)); //This subscriber should be immune to stop signals
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Broker unavailable, will try to reconnect after 10 seconds");
|
||||
try {
|
||||
Thread.sleep(10000);
|
||||
}catch (InterruptedException i){
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Sleep was interrupted, will immediately try to connect to the broker");
|
||||
}
|
||||
|
||||
}
|
||||
try {
|
||||
Clock clock = Clock.systemUTC();
|
||||
Long current_time = clock.millis();
|
||||
Long targeted_prediction_time;
|
||||
synchronized (ADAPTATION_TIMES_MODIFY) {
|
||||
while (!ADAPTATION_TIMES_MODIFY.getValue()) {
|
||||
try {
|
||||
ADAPTATION_TIMES_MODIFY.wait();
|
||||
} catch (InterruptedException e) {
|
||||
Logger.getAnonymousLogger().log(warning_logging_level, "Interrupted while waiting to access the lock for adaptation times object");
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
ADAPTATION_TIMES_MODIFY.setValue(false);
|
||||
clean_data(adaptation_times_to_remove);
|
||||
//targeted_prediction_time = adaptation_times.stream().min(Long::compare).get();
|
||||
targeted_prediction_time = get_next_targeted_prediction_time();
|
||||
ADAPTATION_TIMES_MODIFY.setValue(true);
|
||||
ADAPTATION_TIMES_MODIFY.notifyAll();
|
||||
};
|
||||
CharacterizedThread.create_new_thread(slo_rules_topic_subscriber_runnable,"slo_rules_topic_subscriber_thread", persistent_running_thread,true);
|
||||
|
||||
|
||||
//Implementation of 'Lost edge device' thread
|
||||
|
||||
BiFunction<String, String, String> device_lost_subscriber_function = (topic, message) -> {
|
||||
BrokerPublisher persistent_publisher = new BrokerPublisher(topic_for_severity_announcement, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password"), amq_library_configuration_location);
|
||||
|
||||
Clock clock = Clock.systemUTC();
|
||||
Long current_time_seconds = (long) Math.floor(clock.millis()/1000.0);
|
||||
JSONObject severity_json = new JSONObject();
|
||||
severity_json.put("severity", 100);
|
||||
severity_json.put("probability", 100);
|
||||
severity_json.put("predictionTime", current_time_seconds);
|
||||
persistent_publisher.publish(severity_json.toJSONString());
|
||||
|
||||
//TODO possibly necessary to remove the next adaptation time as it will probably not be possible to start an adaptation during it
|
||||
return topic + ":MSG:" + message;
|
||||
};
|
||||
|
||||
BrokerSubscriber device_lost_subscriber = new BrokerSubscriber(topic_for_lost_device_announcement, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password"), amq_library_configuration_location);
|
||||
|
||||
Runnable device_lost_topic_subscriber_runnable = () -> {
|
||||
while (true) {
|
||||
device_lost_subscriber.subscribe(device_lost_subscriber_function, new AtomicBoolean(false)); //This subscriber should be immune to stop signals
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Broker unavailable, will try to reconnect after 10 seconds");
|
||||
try {
|
||||
Thread.sleep(10000);
|
||||
}catch (InterruptedException i){
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Sleep was interrupted, will immediately try to connect to the broker");
|
||||
}
|
||||
if (targeted_prediction_time==null){
|
||||
continue;
|
||||
}
|
||||
Logger.getAnonymousLogger().log(info_logging_level, "Targeted_prediction_time " + targeted_prediction_time);
|
||||
Thread internal_severity_calculation_thread = new Thread(() -> {
|
||||
try {
|
||||
/*
|
||||
synchronized (ADAPTATION_TIMES_MODIFY) {
|
||||
while (!ADAPTATION_TIMES_MODIFY.getValue()) {
|
||||
ADAPTATION_TIMES_MODIFY.wait();
|
||||
}
|
||||
ADAPTATION_TIMES_MODIFY.setValue(false);
|
||||
adaptation_times.remove(targeted_prediction_time);//remove from the list of timepoints which should be processed. Later this timepoint will be added to the adaptation_times_to_remove HashSet to remove any data associated with it
|
||||
ADAPTATION_TIMES_MODIFY.setValue(true);
|
||||
ADAPTATION_TIMES_MODIFY.notifyAll();
|
||||
}
|
||||
//adaptation_times_pending_processing.add(targeted_prediction_time);
|
||||
|
||||
*/
|
||||
synchronized (PREDICTION_EXISTS) {
|
||||
PREDICTION_EXISTS.setValue(adaptation_times.size() > 0);
|
||||
}
|
||||
|
||||
Long sleep_time = targeted_prediction_time * 1000 - time_horizon_seconds * 1000L - current_time;
|
||||
if (sleep_time <= 0) {
|
||||
Logger.getAnonymousLogger().log(info_logging_level, "Prediction cancelled as targeted prediction time was " + targeted_prediction_time * 1000 + " current time is " + current_time + " and the time_horizon is " + time_horizon_seconds * 1000);
|
||||
return; //The predictions are too near to the targeted reconfiguration time (or are even obsolete)
|
||||
} else if (sleep_time > current_time + maximum_acceptable_forward_predictions * time_horizon_seconds * 1000L) {
|
||||
Logger.getAnonymousLogger().log(info_logging_level, "Prediction cancelled as targeted prediction time was " + targeted_prediction_time * 1000 + " and the current time is " + current_time + ". The prediction is more than " + maximum_acceptable_forward_predictions + " time_horizon intervals into the future (the time_horizon is " + time_horizon_seconds * 1000 + " milliseconds)");
|
||||
return; //The predictions are too near to the targeted reconfiguration tim
|
||||
}
|
||||
Logger.getAnonymousLogger().log(info_logging_level, "Sleeping for " + sleep_time + " milliseconds");
|
||||
sleep(sleep_time);
|
||||
double rule_severity = process_rule_value(rule.getRule_representation(), targeted_prediction_time, rule.getRule_format());
|
||||
double slo_violation_probability = determine_slo_violation_probability(rule_severity);
|
||||
Logger.getAnonymousLogger().log(info_logging_level, "The overall " + slo_violation_determination_method + " severity - calculated from real data - for adaptation time " + targeted_prediction_time + " ( " + (new Date((new Timestamp(targeted_prediction_time * 1000)).getTime())) + " ) is " + rule_severity + " and is calculated " + time_horizon_seconds + " seconds beforehand");
|
||||
Logger.getAnonymousLogger().log(info_logging_level, "The probability of an SLO violation is " + ((int) (slo_violation_probability * 100)) + "%" + (slo_violation_probability< slo_violation_probability_threshold ?" so it will not be published":" and it will be published"));
|
||||
|
||||
if (slo_violation_probability>= slo_violation_probability_threshold) {
|
||||
JSONObject severity_json = new JSONObject();
|
||||
severity_json.put("severity", rule_severity);
|
||||
severity_json.put("probability", slo_violation_probability);
|
||||
severity_json.put("predictionTime", targeted_prediction_time);
|
||||
persistent_publisher.publish(severity_json.toJSONString());
|
||||
}
|
||||
|
||||
slo_violation_event_recording_queue.add(System.currentTimeMillis());
|
||||
|
||||
//Probably not necessary to synchronize the line below as each removal will happen only once in a reconfiguration interval, and reconfiguration intervals are assumed to have a duration of minutes.
|
||||
//Necessary to synchronize because another severity calculation thread might invoke clean_data above, and then a concurrent modification exception may arise
|
||||
synchronized (ADAPTATION_TIMES_MODIFY){
|
||||
while (!ADAPTATION_TIMES_MODIFY.getValue()){
|
||||
ADAPTATION_TIMES_MODIFY.wait();
|
||||
}
|
||||
ADAPTATION_TIMES_MODIFY.setValue(false);
|
||||
adaptation_times_to_remove.add(targeted_prediction_time); //This line serves a different purpose from the adaptation_times.remove(...) directive above, as the adaptation_times_to_remove HashSet contains timepoints which should be processed to delete their data.
|
||||
adaptation_times_pending_processing.remove(targeted_prediction_time);
|
||||
ADAPTATION_TIMES_MODIFY.setValue(true);
|
||||
ADAPTATION_TIMES_MODIFY.notifyAll();
|
||||
}
|
||||
} catch (InterruptedException i) {
|
||||
Logger.getAnonymousLogger().log(severe_logging_level, "Severity calculation thread for epoch time " + targeted_prediction_time + " interrupted, stopping...");
|
||||
return;
|
||||
}
|
||||
});
|
||||
internal_severity_calculation_thread.setName("internal_severity_calculation_thread_" + targeted_prediction_time);
|
||||
internal_severity_calculation_thread.start();
|
||||
} catch (NoSuchElementException n) {
|
||||
Logger.getAnonymousLogger().log(warning_logging_level, "Could not calculate severity as a value was missing...");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
CharacterizedThread.create_new_thread(device_lost_topic_subscriber_runnable,"device_lost_topic_subscriber_thread",persistent_running_thread,true);
|
||||
|
||||
|
||||
if (self_publish_rule_file) {
|
||||
String json_file_name = prop.getProperty("input_file");
|
||||
String rules_json_string = String.join(EMPTY, Files.readAllLines(Paths.get(new File(json_file_name).getAbsolutePath())));
|
||||
BrokerPublisher publisher = new BrokerPublisher(slo_rules_topic, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password"), amq_library_configuration_location);
|
||||
publisher.publish(rules_json_string);
|
||||
Logger.getAnonymousLogger().log(info_logging_level, "Sent message\n" + rules_json_string);
|
||||
}
|
||||
running_threads.remove("severity_calculation_thread_"+rule.toString());
|
||||
});
|
||||
String severity_calculation_thread_name = "severity_calculation_thread_"+rule.toString();
|
||||
severity_calculation_thread.setName(severity_calculation_thread_name);
|
||||
severity_calculation_thread.start();
|
||||
running_threads.put(severity_calculation_thread_name,severity_calculation_thread);
|
||||
}/*
|
||||
while (true){
|
||||
|
||||
}*/
|
||||
|
||||
//return slo_processors;
|
||||
}
|
||||
|
||||
private static Long get_next_targeted_prediction_time() {
|
||||
List<Long> possible_targeted_prediction_times = adaptation_times.stream().sorted().limit(maximum_acceptable_forward_predictions).collect(Collectors.toList());
|
||||
for (int i=0; i<possible_targeted_prediction_times.size(); i++){
|
||||
Long possible_targeted_adaptation_time = possible_targeted_prediction_times.get(i);
|
||||
if (!adaptation_times_pending_processing.contains(possible_targeted_adaptation_time)){
|
||||
adaptation_times.remove(possible_targeted_adaptation_time);
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Removing targeted prediction time "+possible_targeted_adaptation_time+" as it is going to be used");
|
||||
adaptation_times_pending_processing.add(possible_targeted_adaptation_time);
|
||||
return possible_targeted_adaptation_time;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private static void clean_data(HashSet<Long> adaptation_times_to_remove) {
|
||||
for (Long processed_adaptation_time:adaptation_times_to_remove){
|
||||
if (processed_adaptation_time>last_processed_adaptation_time){
|
||||
last_processed_adaptation_time = processed_adaptation_time;
|
||||
}
|
||||
synchronized (Main.can_modify_slo_rules) {
|
||||
while (!Main.can_modify_slo_rules.getValue()) {
|
||||
first_run = false;
|
||||
synchronized (can_modify_slo_rules) {
|
||||
do {
|
||||
try {
|
||||
can_modify_slo_rules.wait();
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}catch (InterruptedException i){
|
||||
i.printStackTrace();
|
||||
}
|
||||
}
|
||||
}while((!can_modify_slo_rules.getValue()) || (!slo_rule_arrived.get()));
|
||||
can_modify_slo_rules.setValue(false);
|
||||
for (SLORule slo_rule : slo_rules) {
|
||||
for (SLOSubRule subrule : slo_rule.getSlo_subrules()) {
|
||||
if (getPredicted_monitoring_attributes().containsKey(subrule.getId())) {
|
||||
getPredicted_monitoring_attributes().get(subrule.getId()).remove(processed_adaptation_time);
|
||||
}
|
||||
slo_rule_arrived.set(false);
|
||||
String rule_representation = MESSAGE_CONTENTS.get_synchronized_contents(slo_rules_topic);
|
||||
if (slo_rule_arrived_has_updated_version(rule_representation)) {
|
||||
if (single_slo_rule_active) {
|
||||
slo_rules.clear();
|
||||
}
|
||||
|
||||
ArrayList<String> additional_metrics_from_new_slo = get_metric_list_from_JSON_slo(rule_representation);
|
||||
|
||||
if (additional_metrics_from_new_slo.size() > 0) {
|
||||
slo_rules.add(new SLORule(rule_representation, additional_metrics_from_new_slo));
|
||||
}
|
||||
can_modify_slo_rules.setValue(true);
|
||||
can_modify_slo_rules.notifyAll();
|
||||
}else{
|
||||
can_modify_slo_rules.setValue(true);
|
||||
can_modify_slo_rules.notifyAll();
|
||||
continue;
|
||||
}
|
||||
can_modify_slo_rules.setValue(true);
|
||||
can_modify_slo_rules.notifyAll();
|
||||
}
|
||||
|
||||
stop_all_running_threads();
|
||||
DebugDataSubscription.initiate(prop.getProperty("broker_ip_url"),prop.getProperty("broker_username"), prop.getProperty("broker_password"));
|
||||
initialize_monitoring_datastructures_with_empty_data(slo_rules);
|
||||
//
|
||||
initialize_subrule_and_attribute_associations(slo_rules);
|
||||
initialize_attribute_subscribers(slo_rules, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password"));
|
||||
initialize_slo_processing(slo_rules);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This function determines the probability of an SLO violation
|
||||
* @param rule_severity The severity of the rule which has been determined
|
||||
* @return The probability of the rule being violated. The minimum value of this probability is 0, and increases as the severity increases
|
||||
*/
|
||||
public static double determine_slo_violation_probability(double rule_severity) {
|
||||
if (slo_violation_determination_method.equals("all-metrics")) {
|
||||
//39.64 is the mean severity value when examining all integer severity values for roc x probability x confidence_interval x delta_value in (-100,100)x(0,100)x(0,100)x(-100,100)
|
||||
/*
|
||||
if (rule_severity >= 40) {
|
||||
return Math.min((50 + 50*(rule_severity - 40) / 60)/100,1); // in case we desire the probability to start from 50%
|
||||
// return Math.min((100*(rule_severity - 40) / 60)/100,1); // in case we desire the probability to start from 0%
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
|
||||
*/
|
||||
return Math.min(rule_severity/100,100);
|
||||
}else if (slo_violation_determination_method.equals("prconf-delta")){
|
||||
//Logger.getAnonymousLogger().log(warning_logging_level,"The calculation of probability for the prconf-delta method needs to be implemented");
|
||||
//return 0;
|
||||
if (rule_severity >= 6.52){
|
||||
return Math.min((50+50*(rule_severity-6.52)/93.48)/100,1);
|
||||
}else{
|
||||
return 0;
|
||||
}
|
||||
|
||||
}else{
|
||||
Logger.getAnonymousLogger().log(warning_logging_level,"Unknown severity calculation method");
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
public static AtomicBoolean getStop_signal() {
|
||||
return stop_signal;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -27,6 +27,7 @@ import java.util.stream.Collectors;
|
||||
|
||||
import static configuration.Constants.*;
|
||||
import static slo_processing.SLOSubRule.find_rule_type;
|
||||
import static utilities.SLOViolationDetectorStateUtils.severity_calculation_event_recording_queue;
|
||||
import static utility_beans.PredictedMonitoringAttribute.getPredicted_monitoring_attributes;
|
||||
|
||||
public class SLORule {
|
||||
@ -225,7 +226,7 @@ public class SLORule {
|
||||
calculation_logging_string.append("\nDue to the severity value being over 10000, it is replaced by 10000");
|
||||
rule_result_value = 10000;
|
||||
}
|
||||
Main.severity_calculation_event_recording_queue.add(calculation_logging_string.toString());
|
||||
severity_calculation_event_recording_queue.add(calculation_logging_string.toString());
|
||||
return rule_result_value;
|
||||
}
|
||||
|
||||
|
@ -5,18 +5,19 @@ import eu.melodic.event.brokerclient.BrokerSubscriber;
|
||||
import org.apache.commons.collections4.queue.CircularFifoQueue;
|
||||
import runtime.Main;
|
||||
import slo_processing.SLOSubRule;
|
||||
import utility_beans.CharacterizedThread;
|
||||
import utility_beans.RealtimeMonitoringAttribute;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import static configuration.Constants.amq_library_configuration_location;
|
||||
import static configuration.Constants.info_logging_level;
|
||||
import static runtime.Main.running_threads;
|
||||
import static runtime.Main.slo_violation_event_recording_queue;
|
||||
import static runtime.Main.*;
|
||||
import static utilities.SLOViolationDetectorStateUtils.*;
|
||||
import static utility_beans.CharacterizedThread.CharacterizedThreadType.slo_bound_running_thread;
|
||||
|
||||
/**
|
||||
* The objective of this class is to allow a structured synopsis of the current state of the SLO Violation Detector to be created, as a response to a request sent to it through an appropriate topic.
|
||||
@ -33,7 +34,18 @@ public class DebugDataSubscription {
|
||||
intermediate_debug_string = new StringBuilder(intermediate_debug_string + "The following threads are currently running" + "\n");
|
||||
|
||||
boolean flag_first_element_iterated = true;
|
||||
for (String s : running_threads.keySet()){
|
||||
intermediate_debug_string.append("Persistent running threads:\n");
|
||||
for (String s : persistent_running_threads.keySet()){
|
||||
if (flag_first_element_iterated) {
|
||||
intermediate_debug_string.append(s);
|
||||
flag_first_element_iterated = false;
|
||||
}else{
|
||||
intermediate_debug_string.append(",\n").append(s);
|
||||
}
|
||||
}
|
||||
flag_first_element_iterated = true;
|
||||
intermediate_debug_string.append("SLO-bound running threads:\n");
|
||||
for (String s : slo_bound_running_threads.keySet()){
|
||||
if (flag_first_element_iterated) {
|
||||
intermediate_debug_string.append(s);
|
||||
flag_first_element_iterated = false;
|
||||
@ -81,7 +93,7 @@ public class DebugDataSubscription {
|
||||
output_debug_data = output_debug_data+intermediate_debug_string;
|
||||
intermediate_debug_string = new StringBuilder();
|
||||
|
||||
output_debug_data = output_debug_data+"\nShowing the adaptation times that pend processing:\n"+ Main.adaptation_times_pending_processing;
|
||||
output_debug_data = output_debug_data+"\nShowing the adaptation times that pend processing:\n"+ adaptation_times_pending_processing;
|
||||
intermediate_debug_string.append("\nThese are the timestamps of the latest adaptation events\n").append(slo_violation_event_recording_queue);
|
||||
|
||||
Logger.getGlobal().log(info_logging_level,"Debug data generated:\n"+output_debug_data);
|
||||
@ -91,11 +103,11 @@ public class DebugDataSubscription {
|
||||
};
|
||||
public static void initiate(String broker_ip_address, String broker_username, String broker_password) {
|
||||
BrokerSubscriber debug_data_subscriber = new BrokerSubscriber(debug_data_trigger_topic_name, broker_ip_address, broker_username, broker_password, amq_library_configuration_location);
|
||||
Thread debug_data_subscription_thread = new Thread(() -> {
|
||||
Runnable debug_data_subscription_runnable = () -> {
|
||||
try {
|
||||
synchronized (Main.HAS_MESSAGE_ARRIVED.get_synchronized_boolean(debug_data_trigger_topic_name)) {
|
||||
synchronized (HAS_MESSAGE_ARRIVED.get_synchronized_boolean(debug_data_trigger_topic_name)) {
|
||||
//if (Main.HAS_MESSAGE_ARRIVED.get_synchronized_boolean(debug_data_topic_name).getValue())
|
||||
debug_data_subscriber.subscribe(debug_data_generation, Main.stop_signal);
|
||||
debug_data_subscriber.subscribe(debug_data_generation, stop_signal);
|
||||
}
|
||||
if (Thread.interrupted()) {
|
||||
throw new InterruptedException();
|
||||
@ -107,10 +119,9 @@ public class DebugDataSubscription {
|
||||
}
|
||||
} finally {
|
||||
Logger.getAnonymousLogger().log(info_logging_level, "Removing debug data subscriber thread for " + debug_data_trigger_topic_name);
|
||||
running_threads.remove("debug_data_subscription_thread_" + debug_data_trigger_topic_name);
|
||||
slo_bound_running_threads.remove("debug_data_subscription_thread_" + debug_data_trigger_topic_name);
|
||||
}
|
||||
});
|
||||
running_threads.put("debug_data_subscription_thread_" + debug_data_trigger_topic_name, debug_data_subscription_thread);
|
||||
debug_data_subscription_thread.start();
|
||||
};
|
||||
CharacterizedThread.create_new_thread(debug_data_subscription_runnable,"debug_data_subscription_thread_" + debug_data_trigger_topic_name,slo_bound_running_thread,true);
|
||||
}
|
||||
}
|
||||
|
@ -6,9 +6,11 @@
|
||||
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
*/
|
||||
|
||||
package utility_beans;
|
||||
package utilities;
|
||||
|
||||
import slo_processing.SLOSubRule;
|
||||
import utility_beans.MonitoringAttributeStatistics;
|
||||
import utility_beans.RealtimeMonitoringAttribute;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
@ -0,0 +1,20 @@
|
||||
package utilities;
|
||||
|
||||
import utility_beans.OperationalMode;
|
||||
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
public class OperationalModeUtils{
|
||||
public static OperationalMode getSLOViolationDetectionOperationalMode(String operational_mode) {
|
||||
if (operational_mode.equalsIgnoreCase("DIRECTOR")){
|
||||
return OperationalMode.DIRECTOR;
|
||||
}else if (operational_mode.equalsIgnoreCase("DETECTOR")){
|
||||
return OperationalMode.DETECTOR;
|
||||
}
|
||||
else{
|
||||
Logger.getAnonymousLogger().log(Level.SEVERE,"Creating new SLO Violation Detection instance as a DETECTOR node, however the specification of the type of node whould be DIRECTOR or DETECTOR, not "+operational_mode);
|
||||
return OperationalMode.DIRECTOR;
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,61 @@
|
||||
package utilities;
|
||||
|
||||
import org.apache.commons.collections4.queue.CircularFifoQueue;
|
||||
import slo_processing.SLORule;
|
||||
import utility_beans.*;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import static configuration.Constants.*;
|
||||
|
||||
public class SLOViolationDetectorStateUtils {
|
||||
public static ArrayList<SLORule> slo_rules = new ArrayList<>();
|
||||
public static HashMap<String,Thread> slo_bound_running_threads = new HashMap<>();
|
||||
public static HashMap<String,Thread> persistent_running_threads = new HashMap<>();
|
||||
public static HashSet<Long> adaptation_times = new HashSet<>();
|
||||
public static HashSet<Long> adaptation_times_pending_processing = new HashSet<>();
|
||||
public static Long last_processed_adaptation_time = -1L;//initialization
|
||||
public static int slo_violation_detection_component_instance_identifier;
|
||||
private static String self_starting_command_string = "java -jar SLOSeverityCalculator-4.0-SNAPSHOT.jar > $LOG_FILE 2>&1";
|
||||
public static OperationalMode operational_mode;
|
||||
public static final AtomicBoolean stop_signal = new AtomicBoolean(false);
|
||||
public static final SynchronizedInteger create_new_slo_detector = new SynchronizedInteger(0);
|
||||
public static final SynchronizedBoolean PREDICTION_EXISTS = new SynchronizedBoolean(false);
|
||||
public static final SynchronizedBoolean ADAPTATION_TIMES_MODIFY = new SynchronizedBoolean(true);
|
||||
public static SynchronizedBooleanMap HAS_MESSAGE_ARRIVED = new SynchronizedBooleanMap();
|
||||
public static SynchronizedStringMap MESSAGE_CONTENTS = new SynchronizedStringMap();
|
||||
|
||||
public static final AtomicBoolean slo_rule_arrived = new AtomicBoolean(false);
|
||||
public static final SynchronizedBoolean can_modify_slo_rules = new SynchronizedBoolean(false);
|
||||
|
||||
//Debugging variables
|
||||
public static CircularFifoQueue<Long> slo_violation_event_recording_queue = new CircularFifoQueue<>(50);
|
||||
public static CircularFifoQueue<String> severity_calculation_event_recording_queue = new CircularFifoQueue<>(50);
|
||||
public static Properties prop = new Properties();
|
||||
|
||||
|
||||
public static InputStream getPreferencesFileInputStream(String custom_properties_file_path) throws IOException {
|
||||
if (custom_properties_file_path==null || custom_properties_file_path.equals(EMPTY)) {
|
||||
base_project_path = new File(EMPTY).toURI();
|
||||
URI absolute_configuration_file_path = new File(configuration_file_location).toURI();
|
||||
URI relative_configuration_file_path = base_project_path.relativize(absolute_configuration_file_path);
|
||||
Logger.getAnonymousLogger().log(info_logging_level, "This is the base project path:" + base_project_path);
|
||||
return new FileInputStream(base_project_path.getPath() + relative_configuration_file_path);
|
||||
}else{
|
||||
if (base_project_path == null || base_project_path.getPath().equals(EMPTY)) {
|
||||
base_project_path = new File(custom_properties_file_path).toURI();
|
||||
}
|
||||
return new FileInputStream(base_project_path.getPath());
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,252 @@
|
||||
package utilities;
|
||||
|
||||
import metric_retrieval.AttributeSubscription;
|
||||
import org.json.simple.JSONArray;
|
||||
import org.json.simple.JSONObject;
|
||||
import org.json.simple.parser.JSONParser;
|
||||
import slo_processing.SLORule;
|
||||
import slo_processing.SLOSubRule;
|
||||
import utility_beans.CharacterizedThread;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static configuration.Constants.*;
|
||||
import static processing_logic.Runnables.get_severity_calculation_runnable;
|
||||
import static runtime.Main.*;
|
||||
import static utility_beans.CharacterizedThread.CharacterizedThreadType.slo_bound_running_thread;
|
||||
import static utilities.SLOViolationDetectorStateUtils.*;
|
||||
import static utility_beans.PredictedMonitoringAttribute.getPredicted_monitoring_attributes;
|
||||
|
||||
public class SLOViolationDetectorUtils {
|
||||
|
||||
public static HashSet<Long> adaptation_times_to_remove = new HashSet<>();
|
||||
|
||||
|
||||
public static void initialize_subrule_and_attribute_associations(ArrayList<SLORule> slo_rules) {
|
||||
synchronized (can_modify_slo_rules) {
|
||||
while (!can_modify_slo_rules.getValue()){
|
||||
try {
|
||||
can_modify_slo_rules.wait();
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
can_modify_slo_rules.setValue(false);
|
||||
for (SLORule slo_rule : slo_rules) {
|
||||
for (SLOSubRule subrule : SLORule.parse_subrules(slo_rule.getRule_representation(),slo_rule.getRule_format())) {
|
||||
SLOSubRule.getSlo_subrules_per_monitoring_attribute().computeIfAbsent(subrule.getMetric(), k -> new ArrayList<>());
|
||||
SLOSubRule.getSlo_subrules_per_monitoring_attribute().get(subrule.getMetric()).add(subrule);
|
||||
}
|
||||
}
|
||||
can_modify_slo_rules.setValue(true);
|
||||
can_modify_slo_rules.notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public static void initialize_monitoring_datastructures_with_empty_data(ArrayList<SLORule> slo_rules){
|
||||
for(SLORule slo_rule: slo_rules){
|
||||
for (String metric_name : slo_rule.get_monitoring_attributes()) {
|
||||
MonitoringAttributeUtilities.initialize_values(metric_name);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static void initialize_slo_processing(ArrayList<SLORule> rules_list){
|
||||
|
||||
for (SLORule rule:rules_list) {
|
||||
|
||||
String severity_calculation_thread_name = "severity_calculation_thread_"+rule.toString();
|
||||
CharacterizedThread.create_new_thread(get_severity_calculation_runnable(rule),severity_calculation_thread_name, slo_bound_running_thread,true);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public static void clean_data(HashSet<Long> adaptation_times_to_remove) {
|
||||
for (Long processed_adaptation_time:adaptation_times_to_remove){
|
||||
if (processed_adaptation_time>last_processed_adaptation_time){
|
||||
last_processed_adaptation_time = processed_adaptation_time;
|
||||
}
|
||||
synchronized (can_modify_slo_rules) {
|
||||
while (!can_modify_slo_rules.getValue()) {
|
||||
try {
|
||||
can_modify_slo_rules.wait();
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
can_modify_slo_rules.setValue(false);
|
||||
for (SLORule slo_rule : slo_rules) {
|
||||
for (SLOSubRule subrule : slo_rule.getSlo_subrules()) {
|
||||
if (getPredicted_monitoring_attributes().containsKey(subrule.getId())) {
|
||||
getPredicted_monitoring_attributes().get(subrule.getId()).remove(processed_adaptation_time);
|
||||
}
|
||||
}
|
||||
}
|
||||
can_modify_slo_rules.setValue(true);
|
||||
can_modify_slo_rules.notifyAll();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static Long get_next_targeted_prediction_time() {
|
||||
List<Long> possible_targeted_prediction_times = adaptation_times.stream().sorted().limit(maximum_acceptable_forward_predictions).collect(Collectors.toList());
|
||||
for (int i=0; i<possible_targeted_prediction_times.size(); i++){
|
||||
Long possible_targeted_adaptation_time = possible_targeted_prediction_times.get(i);
|
||||
if (!adaptation_times_pending_processing.contains(possible_targeted_adaptation_time)){
|
||||
adaptation_times.remove(possible_targeted_adaptation_time);
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Removing targeted prediction time "+possible_targeted_adaptation_time+" as it is going to be used");
|
||||
adaptation_times_pending_processing.add(possible_targeted_adaptation_time);
|
||||
return possible_targeted_adaptation_time;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public static ArrayList<AttributeSubscription> initialize_attribute_subscribers(ArrayList<SLORule> rules_list, String broker_ip_address, String broker_username, String broker_password){
|
||||
ArrayList<AttributeSubscription> attribute_subscribers = new ArrayList<>();
|
||||
for (SLORule rule:rules_list){
|
||||
attribute_subscribers.add(new AttributeSubscription(rule,broker_ip_address,broker_username,broker_password));
|
||||
}
|
||||
return attribute_subscribers;
|
||||
}
|
||||
|
||||
public static AtomicBoolean getStop_signal() {
|
||||
return stop_signal;
|
||||
}
|
||||
|
||||
public static boolean slo_rule_arrived_has_updated_version(String rule_representation) {
|
||||
JSONObject json_object = null;
|
||||
long json_object_version = Integer.MAX_VALUE;
|
||||
try {
|
||||
json_object = (JSONObject) new JSONParser().parse(rule_representation);
|
||||
json_object_version = (Long) json_object.get("version");
|
||||
} catch (NullPointerException n){
|
||||
n.printStackTrace();
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Unfortunately a null message was sent to the SLO Violation Detector, which is being ignored");
|
||||
return false;
|
||||
} catch (Exception e){
|
||||
e.printStackTrace();
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Could not parse the JSON of the new SLO, assuming it is not an updated rule...");
|
||||
return false;
|
||||
}
|
||||
if (json_object_version > current_slo_rules_version){
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"An SLO with updated version ("+json_object_version+" vs older "+current_slo_rules_version+") has arrived");
|
||||
current_slo_rules_version=json_object_version;
|
||||
return true;
|
||||
}else {
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Taking no action for the received SLO message as the version number is not updated");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
public static void stop_all_running_threads() {
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Asking previously existing threads to terminate");
|
||||
int initial_number_of_running_threads = slo_bound_running_threads.size();
|
||||
while (slo_bound_running_threads.size()>0) {
|
||||
synchronized (stop_signal) {
|
||||
stop_signal.set(true);
|
||||
stop_signal.notifyAll();
|
||||
}
|
||||
try {
|
||||
Thread.sleep(3000);
|
||||
slo_bound_running_threads.values().forEach(Thread::interrupt);
|
||||
}catch (Exception e){
|
||||
}
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Stopped "+(initial_number_of_running_threads- slo_bound_running_threads.size())+"/"+initial_number_of_running_threads+" already running threads");
|
||||
if (slo_bound_running_threads.size()>1){
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"The threads which are still running are the following: "+ slo_bound_running_threads);
|
||||
}else if (slo_bound_running_threads.size()>0){
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"The thread which is still running is the following: "+ slo_bound_running_threads);
|
||||
}
|
||||
}
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"All threads have terminated");
|
||||
synchronized (stop_signal) {
|
||||
stop_signal.set(false);
|
||||
}
|
||||
synchronized (PREDICTION_EXISTS){
|
||||
PREDICTION_EXISTS.setValue(false);
|
||||
}
|
||||
adaptation_times.clear();
|
||||
}
|
||||
|
||||
public static ArrayList<String> get_metric_list_from_JSON_slo(String json_object_string) {
|
||||
HashSet<String> metric_list = new HashSet<>();
|
||||
try {
|
||||
JSONObject json_object = (JSONObject) new JSONParser().parse(json_object_string);
|
||||
String json_object_id = (String) json_object.get("id");
|
||||
String json_object_name = (String) json_object.get("name");
|
||||
//Older format uses id-based fields, newer format uses a non-variable structure
|
||||
//We first check if an event using the older format is sent, and then check if the event is sent using the newer format
|
||||
if (json_object_id!=null) {
|
||||
if (json_object_id.split("-").length > 1) {
|
||||
//String composite_rule_type = json_object_id.split("-")[0];
|
||||
JSONArray internal_json_slos = (JSONArray) json_object.get(json_object_id);
|
||||
for (Object o : internal_json_slos) {
|
||||
JSONObject internal_json_slo = (JSONObject) o;
|
||||
metric_list.addAll(get_metric_list_from_JSON_slo(internal_json_slo.toJSONString()));
|
||||
}
|
||||
} else {
|
||||
metric_list.add((String) json_object.get("attribute"));
|
||||
}
|
||||
}
|
||||
//If newer format is used
|
||||
else if (json_object_name!=null){
|
||||
JSONArray internal_json_slos = (JSONArray) json_object.get("constraints");
|
||||
if ((internal_json_slos!=null) && (internal_json_slos.size()>0)){
|
||||
for (Object o : internal_json_slos) {
|
||||
JSONObject internal_json_slo = (JSONObject) o;
|
||||
metric_list.addAll(get_metric_list_from_JSON_slo(internal_json_slo.toJSONString()));
|
||||
}
|
||||
}else{
|
||||
metric_list.add((String) json_object.get("metric"));
|
||||
}
|
||||
}else{
|
||||
Logger.getAnonymousLogger().log(Level.INFO,"An SLO rule was sent in a format which could not be fully parsed, therefore ignoring this rule. The non-understandable part of the SLO rule is printed below"+"\n"+json_object_string);
|
||||
}
|
||||
}catch (Exception p){
|
||||
p.printStackTrace();
|
||||
return new ArrayList<String>();
|
||||
}
|
||||
return new ArrayList<String>(metric_list);
|
||||
}
|
||||
|
||||
/**
|
||||
* This function determines the probability of an SLO violation
|
||||
* @param rule_severity The severity of the rule which has been determined
|
||||
* @return The probability of the rule being violated. The minimum value of this probability is 0, and increases as the severity increases
|
||||
*/
|
||||
public static double determine_slo_violation_probability(double rule_severity) {
|
||||
if (slo_violation_determination_method.equals("all-metrics")) {
|
||||
//39.64 is the mean severity value when examining all integer severity values for roc x probability x confidence_interval x delta_value in (-100,100)x(0,100)x(0,100)x(-100,100)
|
||||
/*
|
||||
if (rule_severity >= 40) {
|
||||
return Math.min((50 + 50*(rule_severity - 40) / 60)/100,1); // in case we desire the probability to start from 50%
|
||||
// return Math.min((100*(rule_severity - 40) / 60)/100,1); // in case we desire the probability to start from 0%
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
|
||||
*/
|
||||
return Math.min(rule_severity/100,100);
|
||||
}else if (slo_violation_determination_method.equals("prconf-delta")){
|
||||
//Logger.getAnonymousLogger().log(warning_logging_level,"The calculation of probability for the prconf-delta method needs to be implemented");
|
||||
//return 0;
|
||||
if (rule_severity >= 6.52){
|
||||
return Math.min((50+50*(rule_severity-6.52)/93.48)/100,1);
|
||||
}else{
|
||||
return 0;
|
||||
}
|
||||
|
||||
}else{
|
||||
Logger.getAnonymousLogger().log(warning_logging_level,"Unknown severity calculation method");
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,34 @@
|
||||
package utility_beans;
|
||||
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import static utilities.SLOViolationDetectorStateUtils.persistent_running_threads;
|
||||
import static utilities.SLOViolationDetectorStateUtils.slo_bound_running_threads;
|
||||
|
||||
public class CharacterizedThread{
|
||||
public enum CharacterizedThreadType{
|
||||
slo_bound_running_thread,persistent_running_thread,undefined
|
||||
}
|
||||
|
||||
public enum CharacterizedThreadRunMode{
|
||||
attached,detached
|
||||
}
|
||||
|
||||
public static Thread create_new_thread(Runnable runnable, String thread_name, CharacterizedThreadType thread_type, boolean start_thread_now){
|
||||
Thread thread = new Thread(runnable);
|
||||
thread.setName(thread_name);
|
||||
if (thread_type.equals(CharacterizedThreadType.slo_bound_running_thread)){
|
||||
slo_bound_running_threads.put(thread_name,thread);
|
||||
}else if (thread_type.equals(CharacterizedThreadType.persistent_running_thread)){
|
||||
persistent_running_threads.put(thread_name,thread);
|
||||
}else{
|
||||
Logger.getAnonymousLogger().log(Level.WARNING,"Undefined type of thread for thread with name: "+thread_name);
|
||||
}
|
||||
if (start_thread_now) {
|
||||
thread.start();
|
||||
}
|
||||
return thread;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,5 @@
|
||||
package utility_beans;
|
||||
|
||||
public enum OperationalMode {
|
||||
DIRECTOR,DETECTOR
|
||||
}
|
@ -13,7 +13,7 @@ import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import static configuration.Constants.*;
|
||||
import static utility_beans.MonitoringAttributeUtilities.isZero;
|
||||
import static utilities.MonitoringAttributeUtilities.isZero;
|
||||
import static utility_beans.RealtimeMonitoringAttribute.*;
|
||||
|
||||
public class PredictedMonitoringAttribute {
|
||||
|
@ -0,0 +1,19 @@
|
||||
package utility_beans;
|
||||
|
||||
public class SynchronizedInteger {
|
||||
private Integer value;
|
||||
public SynchronizedInteger(Integer value){
|
||||
this.value = value;
|
||||
}
|
||||
public SynchronizedInteger(){
|
||||
this(0);
|
||||
}
|
||||
|
||||
public Integer getValue() {
|
||||
return value;
|
||||
}
|
||||
|
||||
public void setValue(Integer value) {
|
||||
this.value = value;
|
||||
}
|
||||
}
|
@ -0,0 +1,3 @@
|
||||
Manifest-Version: 1.0
|
||||
Main-Class: runtime.Main
|
||||
|
@ -0,0 +1 @@
|
||||
spring.profiles.active=production
|
@ -8,13 +8,11 @@
|
||||
|
||||
import eu.melodic.event.brokerclient.BrokerPublisher;
|
||||
import eu.melodic.event.brokerclient.BrokerSubscriber;
|
||||
import org.apache.commons.lang3.mutable.MutableBoolean;
|
||||
import org.json.simple.JSONObject;
|
||||
import org.json.simple.parser.JSONParser;
|
||||
import org.json.simple.parser.ParseException;
|
||||
import org.junit.Test;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
@ -26,7 +24,7 @@ import java.util.function.BiFunction;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import static configuration.Constants.*;
|
||||
import static runtime.Main.running_threads;
|
||||
import static utilities.SLOViolationDetectorStateUtils.slo_bound_running_threads;
|
||||
|
||||
|
||||
public class ConnectivityTests {
|
||||
@ -74,7 +72,7 @@ public class ConnectivityTests {
|
||||
subscriber.subscribe(slo_function,new AtomicBoolean(false)); //will be a short-lived test, so setting stop signal to false
|
||||
});
|
||||
subscription_thread.start();
|
||||
running_threads.put("Test topic subscription thread",subscription_thread);
|
||||
slo_bound_running_threads.put("Test topic subscription thread",subscription_thread);
|
||||
|
||||
publisher.publish(object_to_publish.toJSONString());
|
||||
try {
|
||||
|
@ -8,7 +8,6 @@
|
||||
|
||||
import org.junit.Test;
|
||||
import utility_beans.MonitoringAttributeStatistics;
|
||||
import utility_beans.MonitoringAttributeUtilities;
|
||||
import utility_beans.RealtimeMonitoringAttribute;
|
||||
import utility_beans.PredictedMonitoringAttribute;
|
||||
|
||||
|
@ -19,7 +19,7 @@ import org.junit.Test;
|
||||
import runtime.Main;
|
||||
import slo_processing.SLORule;
|
||||
import slo_processing.SLOSubRule;
|
||||
import utility_beans.MonitoringAttributeUtilities;
|
||||
import utilities.MonitoringAttributeUtilities;
|
||||
import utility_beans.PredictedMonitoringAttribute;
|
||||
import utility_beans.RealtimeMonitoringAttribute;
|
||||
|
||||
@ -39,9 +39,9 @@ import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import static configuration.Constants.*;
|
||||
import static runtime.Main.PREDICTION_EXISTS;
|
||||
import static runtime.Main.initialize_subrule_and_attribute_associations;
|
||||
import static slo_processing.SLORule.process_rule_value;
|
||||
import static utilities.SLOViolationDetectorUtils.initialize_subrule_and_attribute_associations;
|
||||
import static utilities.SLOViolationDetectorStateUtils.*;
|
||||
import static utility_beans.PredictedMonitoringAttribute.getPredicted_monitoring_attributes;
|
||||
import static utility_beans.RealtimeMonitoringAttribute.*;
|
||||
|
||||
@ -81,7 +81,7 @@ public class UnboundedMonitoringAttributeTests {
|
||||
|
||||
public void unbounded_monitoring_attribute_test_core(String json_file_name, String metric_1_name, Double[] metric_lower_bound_range, Double[] metric_upper_bound_range, double severity_lower_bound, double base_metric_value, double metric_max_value, double forecasted_metric_value,double generated_data_confidence_interval, double probability) throws IOException, ParseException {
|
||||
|
||||
Main.can_modify_slo_rules.setValue(true);
|
||||
can_modify_slo_rules.setValue(true);
|
||||
Properties prop = new Properties();
|
||||
|
||||
URI absolute_configuration_file_path = new File(configuration_file_location).toURI();
|
||||
@ -153,15 +153,15 @@ public class UnboundedMonitoringAttributeTests {
|
||||
long targeted_prediction_time = ((Number)((JSONObject)new JSONParser().parse(message)).get(EventFields.PredictionMetricEventFields.prediction_time)).longValue();
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"RECEIVED message with predicted value for "+predicted_attribute_name+" equal to "+ forecasted_value);
|
||||
|
||||
synchronized (Main.can_modify_slo_rules) {
|
||||
if(!Main.can_modify_slo_rules.getValue()) {
|
||||
Main.can_modify_slo_rules.wait();
|
||||
synchronized (can_modify_slo_rules) {
|
||||
if(!can_modify_slo_rules.getValue()) {
|
||||
can_modify_slo_rules.wait();
|
||||
}
|
||||
Main.can_modify_slo_rules.setValue(false);
|
||||
can_modify_slo_rules.setValue(false);
|
||||
|
||||
if( Main.adaptation_times.size()==0 || (!Main.adaptation_times.contains(targeted_prediction_time)) && targeted_prediction_time>Main.adaptation_times.stream().min(Long::compare).get()){
|
||||
if( adaptation_times.size()==0 || (!adaptation_times.contains(targeted_prediction_time)) && targeted_prediction_time>adaptation_times.stream().min(Long::compare).get()){
|
||||
Logger.getAnonymousLogger().log(info_logging_level,"Adding a new targeted prediction time "+targeted_prediction_time);
|
||||
Main.adaptation_times.add(targeted_prediction_time);
|
||||
adaptation_times.add(targeted_prediction_time);
|
||||
synchronized (PREDICTION_EXISTS) {
|
||||
PREDICTION_EXISTS.setValue(true);
|
||||
PREDICTION_EXISTS.notifyAll();
|
||||
@ -181,7 +181,7 @@ public class UnboundedMonitoringAttributeTests {
|
||||
getPredicted_monitoring_attributes().get(subrule.getId()).put(targeted_prediction_time, prediction_attribute);
|
||||
}
|
||||
}
|
||||
Main.can_modify_slo_rules.setValue(true);
|
||||
can_modify_slo_rules.setValue(true);
|
||||
}
|
||||
//SLOViolationCalculator.get_Severity_all_metrics_method(prediction_attribute)
|
||||
|
||||
@ -191,7 +191,7 @@ public class UnboundedMonitoringAttributeTests {
|
||||
return message;
|
||||
};
|
||||
Thread forecasted_subscription_thread = new Thread(() -> {
|
||||
synchronized (Main.HAS_MESSAGE_ARRIVED.get_synchronized_boolean(forecasted_metric_topic_name)) {
|
||||
synchronized (HAS_MESSAGE_ARRIVED.get_synchronized_boolean(forecasted_metric_topic_name)) {
|
||||
//if (Main.HAS_MESSAGE_ARRIVED.get_synchronized_boolean(forecasted_metric_topic_name).getValue())
|
||||
forecasted_subscriber.subscribe(forecasted_function,new AtomicBoolean(false)); //will be a short-lived test, so setting stop signal to false
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user