Refactoring to ensure compatibility with Nebulous

Better handling of incoming monitoring messages
Improvement of the handling of incoming debug data generation requests
Major functionality updates to BrokerPublisher and BrokerSubscriber
Corrections in the calculation of Severity for the prconf-delta case

Change-Id: I94a6fdb4612de292c24511445f1236cdce94b365
This commit is contained in:
Andreas Tsagkaropoulos 2024-01-11 14:07:28 +02:00
parent 94672e4461
commit a3e2ada8dc
16 changed files with 287 additions and 125 deletions

View File

@ -9,7 +9,6 @@
package configuration;
import java.net.URI;
import java.util.ArrayList;
import java.util.logging.Level;
public class Constants {
@ -17,7 +16,7 @@ public class Constants {
public static String EMPTY = "";
public static String SPACE = " ";
public static Double LOWER_LIMIT_DELTA = - 100.0;
public static String NAME_SEPARATOR = "#";
//Operational constants
public static String slo_violation_determination_method;
public static int time_horizon_seconds;
@ -32,6 +31,8 @@ public class Constants {
public static String topic_for_lost_device_announcement = "eu.nebulouscloud.device_lost";
public static String slo_rules_topic = "eu.nebulouscloud.monitoring.slo.new";
public static String metric_list_topic = "eu.nebulouscloud.monitoring.metric_list";
public static String realtime_metrics_topic = "eu.nebulouscloud.monitoring.realtime.";
public static String final_metric_prediction_topic = "eu.nebulouscloud.monitoring.predicted.";
public static double slo_violation_probability_threshold = 0.5; //The threshold over which the probability of a predicted slo violation should be to have a violation detection
public static int kept_values_per_metric = 5; //Default to be overriden from the configuration file. This indicates how many metric values are kept to calculate the "previous" metric value during the rate of change calculation
public static String roc_calculation_mode = "prototype";

View File

@ -28,6 +28,7 @@ import utility_beans.RealtimeMonitoringAttribute;
import java.time.Clock;
import java.util.HashMap;
import java.util.function.BiFunction;
import java.util.logging.Level;
import java.util.logging.Logger;
import static configuration.Constants.*;
@ -47,11 +48,12 @@ public class AttributeSubscription {
BrokerSubscriber subscriber = new BrokerSubscriber(realtime_metric_topic_name, broker_ip_address,broker_username,broker_password, amq_library_configuration_location);
BiFunction<String,String,String> function = (topic, message) ->{
RealtimeMonitoringAttribute realtimeMonitoringAttribute = new RealtimeMonitoringAttribute(topic);
synchronized (detector.getSubcomponent_state().getMonitoring_attributes().get(topic)) {
String metric_name_from_topic = topic.replace("eu.nebulouscloud.monitoring.realtime.",EMPTY);
synchronized (detector.getSubcomponent_state().getMonitoring_attributes().get(metric_name_from_topic)) {
try {
update_monitoring_attribute_value(detector,topic,((Number)((JSONObject)new JSONParser().parse(message)).get("metricValue")).doubleValue());
update_monitoring_attribute_value(detector,metric_name_from_topic,((Number)((JSONObject)new JSONParser().parse(message)).get("metricValue")).doubleValue());
Logger.getGlobal().log(info_logging_level,"RECEIVED message with value for "+topic+" equal to "+(((JSONObject)new JSONParser().parse(message)).get("metricValue")));
Logger.getGlobal().log(info_logging_level,"RECEIVED message with value for "+metric_name_from_topic+" equal to "+(((JSONObject)new JSONParser().parse(message)).get("metricValue")));
} catch (ParseException e) {
e.printStackTrace();
Logger.getGlobal().log(info_logging_level,"A parsing exception was caught while parsing message: "+message);
@ -78,7 +80,7 @@ public class AttributeSubscription {
detector.getSubcomponent_state().slo_bound_running_threads.remove("realtime_subscriber_thread_" + realtime_metric_topic_name);
}
};
CharacterizedThread.create_new_thread(realtime_subscription_runnable,"realtime_subscriber_thread_"+realtime_metric_topic_name,true,detector);
CharacterizedThread.create_new_thread(realtime_subscription_runnable,"realtime_subscriber_thread_"+realtime_metric_topic_name,true,detector, CharacterizedThread.CharacterizedThreadType.slo_bound_running_thread);
@ -87,12 +89,14 @@ public class AttributeSubscription {
BrokerSubscriber forecasted_subscriber = new BrokerSubscriber(forecasted_metric_topic_name, broker_ip_address,broker_username,broker_password, amq_library_configuration_location);
BiFunction<String,String,String> forecasted_function = (topic,message) ->{
String predicted_attribute_name = topic.replaceFirst("prediction\\.",EMPTY);
String predicted_attribute_name = topic.replaceFirst("eu\\.nebulouscloud\\.monitoring\\.predicted\\.",EMPTY);
HashMap<Integer, HashMap<Long,PredictedMonitoringAttribute>> predicted_attributes = getPredicted_monitoring_attributes();
try {
double forecasted_value = ((Number)((JSONObject)new JSONParser().parse(message)).get(EventFields.PredictionMetricEventFields.metric_value)).doubleValue();
double probability_confidence = 100*((Number)((JSONObject)new JSONParser().parse(message)).get(EventFields.PredictionMetricEventFields.probability)).doubleValue();
JSONArray json_array_confidence_interval = ((JSONArray)((JSONObject)new JSONParser().parse(message)).get(EventFields.PredictionMetricEventFields.confidence_interval));
JSONObject json_message = (JSONObject)(new JSONParser().parse(message));
Logger.getGlobal().log(Level.INFO,"Getting information for "+EventFields.PredictionMetricEventFields.metricValue);
double forecasted_value = ((Number)json_message.get(EventFields.PredictionMetricEventFields.metricValue.name())).doubleValue();
double probability_confidence = 100*((Number)json_message.get(EventFields.PredictionMetricEventFields.probability.name())).doubleValue();
JSONArray json_array_confidence_interval = (JSONArray)(json_message.get(EventFields.PredictionMetricEventFields.confidence_interval.name()));
double confidence_interval;
try{
@ -102,8 +106,8 @@ public class AttributeSubscription {
c.printStackTrace();
confidence_interval = Double.NEGATIVE_INFINITY;
}
long timestamp = ((Number)((JSONObject)new JSONParser().parse(message)).get(EventFields.PredictionMetricEventFields.timestamp)).longValue();
long targeted_prediction_time = ((Number)((JSONObject)new JSONParser().parse(message)).get(EventFields.PredictionMetricEventFields.prediction_time)).longValue();
long timestamp = ((Number)json_message.get(EventFields.PredictionMetricEventFields.timestamp.name())).longValue();
long targeted_prediction_time = ((Number)json_message.get(EventFields.PredictionMetricEventFields.predictionTime.name())).longValue();
Logger.getGlobal().log(info_logging_level,"RECEIVED message with predicted value for "+predicted_attribute_name+" equal to "+ forecasted_value);
@ -171,6 +175,7 @@ public class AttributeSubscription {
Logger.getGlobal().log(info_logging_level,"Error while trying to parse message\n"+message);
} catch (Exception e){
Logger.getGlobal().log(info_logging_level,"An unknown exception was caught\n"+message);
e.printStackTrace();
}
return message;
};
@ -194,7 +199,7 @@ public class AttributeSubscription {
detector.getSubcomponent_state().persistent_running_detector_threads.remove("forecasting_subscriber_thread_"+forecasted_metric_topic_name);
}
};
CharacterizedThread.create_new_thread(forecasted_subscription_runnable, "forecasting_subscriber_thread_" + forecasted_metric_topic_name, true,detector);
CharacterizedThread.create_new_thread(forecasted_subscription_runnable, "forecasting_subscriber_thread_" + forecasted_metric_topic_name, true,detector, CharacterizedThread.CharacterizedThreadType.slo_bound_running_thread);
}
}
}

View File

@ -2,10 +2,13 @@ package runtime;
import org.springframework.web.bind.annotation.*;
import slo_violation_detector_engine.detector.DetectorSubcomponent;
import utility_beans.BrokerSubscriptionDetails;
import static configuration.Constants.EMPTY;
import static configuration.Constants.default_application_name;
import static runtime.Main.detectors;
import static slo_violation_detector_engine.detector.DetectorSubcomponent.detector_integer_id;
import static slo_violation_detector_engine.detector.DetectorSubcomponent.detector_subcomponents;
import static utilities.DebugDataSubscription.debug_data_generation;
import static utility_beans.CharacterizedThread.CharacterizedThreadRunMode.detached;
@RestController
@ -21,12 +24,13 @@ public class DetectorRequestMappings {
//TODO refine calls to debug_data_generation below, once the interface to AMQP is available
@GetMapping("/component-statistics")
public static String get_component_statistics() {
debug_data_generation.apply("","");
debug_data_generation.apply(new BrokerSubscriptionDetails(false),EMPTY);
return "Debug data generation was successful";
}
@GetMapping("/component-statistics/detectors/{id}")
public static String get_detector_subcomponent_statistics(@PathVariable String id) {
debug_data_generation.apply(id,"");
String detector_name = "detector_"+id;
debug_data_generation.apply(detector_subcomponents.get(detector_name).getBrokerSubscriptionDetails(),EMPTY);
return DetectorSubcomponent.get_detector_subcomponent_statistics();
}
}

View File

@ -34,10 +34,14 @@ public class DetectorSubcomponent extends SLOViolationDetectorSubcomponent {
public final AtomicBoolean slo_rule_arrived = new AtomicBoolean(false);
public Long last_processed_adaptation_time = -1L;//initialization
private String detector_name;
private static String broker_ip = prop.getProperty("broker_ip_url");
private static String broker_username = prop.getProperty("broker_username");
private static String broker_password = prop.getProperty("broker_password");
public DetectorSubcomponent(String application_name, CharacterizedThread.CharacterizedThreadRunMode characterized_thread_run_mode) {
super.thread_type = CharacterizedThread.CharacterizedThreadType.slo_bound_running_thread;
super.thread_type = CharacterizedThread.CharacterizedThreadType.persistent_running_detector_thread;
subcomponent_state = new DetectorSubcomponentState();
Integer current_detector_id;
synchronized (detector_integer_id){
@ -49,13 +53,14 @@ public class DetectorSubcomponent extends SLOViolationDetectorSubcomponent {
detector_integer_id.setValue(detector_integer_id.getValue()+1);
current_detector_id = detector_integer_id.getValue();
//detector_integer_id.notify();
detector_name = "detector_"+current_detector_id;
}
if (characterized_thread_run_mode.equals(attached)) {
DetectorSubcomponentUtilities.run_slo_violation_detection_engine(this);
}else/*detached mode*/{
CharacterizedThread.create_new_thread(new Runnables.SLODetectionEngineRunnable(this), "detector_"+current_detector_id+"_master_thread", true,this);
CharacterizedThread.create_new_thread(new Runnables.SLODetectionEngineRunnable(this), detector_name+"_master_thread", true,this, CharacterizedThread.CharacterizedThreadType.persistent_running_detector_thread);
}
detector_subcomponents.put(application_name+"_"+current_detector_id,this);
detector_subcomponents.put(detector_name,this);
}
public BiFunction<String, String, String> slo_rule_topic_subscriber_function = (topic, message) -> {
@ -84,8 +89,20 @@ public class DetectorSubcomponent extends SLOViolationDetectorSubcomponent {
metric_list_object = (JSONObject) parser.parse(message);
for (Object element : (JSONArray) metric_list_object.get("metric_list")){
metric_name = (String)((JSONObject)element).get("name");
lower_bound = (Double)((JSONObject)element).get("lower_bound");
upper_bound = (Double)((JSONObject)element).get("upper_bound");
String lower_bound_str = (String)((JSONObject)element).get("lower_bound");
String upper_bound_str = (String)((JSONObject)element).get("upper_bound");
if (!(lower_bound_str.toLowerCase().equals("-inf") || lower_bound_str.toLowerCase().equals("-infinity"))){
lower_bound = Double.parseDouble(lower_bound_str);
}else{
lower_bound = Double.NEGATIVE_INFINITY;
}
if (!(upper_bound_str.toLowerCase().equals("inf") || upper_bound_str.toLowerCase().equals("infinity"))){
upper_bound = Double.parseDouble(upper_bound_str);
}else{
upper_bound = Double.POSITIVE_INFINITY;
}
subcomponent_state.getMonitoring_attributes().put(metric_name,new RealtimeMonitoringAttribute(metric_name,lower_bound,upper_bound));
}
}catch (Exception e){
@ -100,9 +117,9 @@ public class DetectorSubcomponent extends SLOViolationDetectorSubcomponent {
}
return "Monitoring metrics message processed";
};
public static BrokerSubscriber device_lost_subscriber = new BrokerSubscriber(topic_for_lost_device_announcement, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password"), amq_library_configuration_location);
public static BrokerSubscriber device_lost_subscriber = new BrokerSubscriber(topic_for_lost_device_announcement, broker_ip, broker_username, broker_password, amq_library_configuration_location);
public static BiFunction<String, String, String> device_lost_subscriber_function = (topic, message) -> {
BrokerPublisher persistent_publisher = new BrokerPublisher(topic_for_severity_announcement, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password"), amq_library_configuration_location);
BrokerPublisher persistent_publisher = new BrokerPublisher(topic_for_severity_announcement, broker_ip, broker_username, broker_password, amq_library_configuration_location);
Clock clock = Clock.systemUTC();
Long current_time_seconds = (long) Math.floor(clock.millis()/1000.0);
@ -130,4 +147,13 @@ public class DetectorSubcomponent extends SLOViolationDetectorSubcomponent {
public void setSubcomponent_state(DetectorSubcomponentState subcomponent_state) {
this.subcomponent_state = subcomponent_state;
}
@Override
public String get_name() {
return detector_name;
}
public BrokerSubscriptionDetails getBrokerSubscriptionDetails() {
return new BrokerSubscriptionDetails(broker_ip,broker_username,broker_password);
}
}

View File

@ -18,13 +18,13 @@ import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import static configuration.Constants.*;
import static slo_violation_detector_engine.generic.Runnables.device_lost_topic_subscriber_runnable;
import static slo_violation_detector_engine.detector.DetectorSubcomponent.device_lost_subscriber;
import static slo_violation_detector_engine.detector.DetectorSubcomponent.device_lost_subscriber_function;
import static slo_violation_detector_engine.generic.Runnables.get_severity_calculation_runnable;
import static runtime.Main.*;
import static slo_violation_detector_engine.generic.SLOViolationDetectorStateUtils.*;
@ -68,7 +68,7 @@ public class DetectorSubcomponentUtilities {
for (SLORule rule:rules_list) {
String severity_calculation_thread_name = "severity_calculation_thread_"+rule.toString();
CharacterizedThread.create_new_thread(get_severity_calculation_runnable(rule,rule.getAssociated_detector()),severity_calculation_thread_name, true,rule.getAssociated_detector());
CharacterizedThread.create_new_thread(get_severity_calculation_runnable(rule,rule.getAssociated_detector()),severity_calculation_thread_name, true,rule.getAssociated_detector(), CharacterizedThread.CharacterizedThreadType.slo_bound_running_thread);
}
}
@ -162,6 +162,9 @@ public class DetectorSubcomponentUtilities {
try {
Thread.sleep(3000);
associated_detector_subcomponent.getSubcomponent_state().slo_bound_running_threads.values().forEach(Thread::interrupt);
for (Thread thread : associated_detector_subcomponent.getSubcomponent_state().slo_bound_running_threads.values()) {
thread.join();
}
}catch (Exception e){
}
Logger.getGlobal().log(info_logging_level,"Stopped "+(initial_number_of_running_threads- associated_detector_subcomponent.getSubcomponent_state().slo_bound_running_threads.size())+"/"+initial_number_of_running_threads+" already running threads");
@ -267,39 +270,69 @@ public class DetectorSubcomponentUtilities {
//Metric list subscription thread
BrokerSubscriber metric_list_subscriber = new BrokerSubscriber(metric_list_topic, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password"), amq_library_configuration_location);
Runnable metric_list_topic_subscriber_runnable = () -> {
while (true) {
metric_list_subscriber.subscribe(associated_detector_subcomponent.metric_list_subscriber_function, associated_detector_subcomponent.stop_signal); //This subscriber should not be immune to stop signals
boolean did_not_finish_execution_gracefully = true;
while (did_not_finish_execution_gracefully) {
int exit_status = metric_list_subscriber.subscribe(associated_detector_subcomponent.metric_list_subscriber_function, associated_detector_subcomponent.stop_signal); //This subscriber should not be immune to stop signals
if (exit_status!=0) {
Logger.getGlobal().log(info_logging_level,"Broker unavailable, will try to reconnect after 10 seconds");
try {
Thread.sleep(10000);
}catch (InterruptedException i){
Logger.getGlobal().log(info_logging_level,"Sleep was interrupted, will immediately try to connect to the broker");
} catch (InterruptedException i) {
Logger.getGlobal().log(info_logging_level, "Sleep was interrupted, will immediately try to connect to the broker");
}
}else{
did_not_finish_execution_gracefully = false;
}
}
associated_detector_subcomponent.getSubcomponent_state().slo_bound_running_threads.remove(Thread.currentThread().getName().split(NAME_SEPARATOR)[0]);
};
CharacterizedThread.create_new_thread(metric_list_topic_subscriber_runnable,"metric_list_topic_subscriber_thread",true,associated_detector_subcomponent);
CharacterizedThread.create_new_thread(metric_list_topic_subscriber_runnable,"metric_list_topic_subscriber_thread",true,associated_detector_subcomponent, CharacterizedThread.CharacterizedThreadType.slo_bound_running_thread);
//SLO rule subscription thread
BrokerSubscriber slo_rule_topic_subscriber = new BrokerSubscriber(slo_rules_topic, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password"), amq_library_configuration_location);
Runnable slo_rules_topic_subscriber_runnable = () -> {
while (true) {
slo_rule_topic_subscriber.subscribe(associated_detector_subcomponent.slo_rule_topic_subscriber_function, associated_detector_subcomponent.stop_signal); //This subscriber should not be immune to stop signals
Logger.getGlobal().log(info_logging_level,"Broker unavailable, will try to reconnect after 10 seconds");
boolean did_not_finish_execution_gracefully = true;
while (did_not_finish_execution_gracefully) {
int exit_status = slo_rule_topic_subscriber.subscribe(associated_detector_subcomponent.slo_rule_topic_subscriber_function, associated_detector_subcomponent.stop_signal); //This subscriber should not be immune to stop signals
if (exit_status!=0) {
Logger.getGlobal().log(info_logging_level, "Broker unavailable, will try to reconnect after 10 seconds");
try {
Thread.sleep(10000);
}catch (InterruptedException i){
Logger.getGlobal().log(info_logging_level,"Sleep was interrupted, will immediately try to connect to the broker");
} catch (InterruptedException i) {
Logger.getGlobal().log(info_logging_level, "Sleep was interrupted, will immediately try to connect to the broker");
}
}else{
did_not_finish_execution_gracefully = false;
}
}
associated_detector_subcomponent.getSubcomponent_state().slo_bound_running_threads.remove(Thread.currentThread().getName().split(NAME_SEPARATOR)[0]);
};
CharacterizedThread.create_new_thread(slo_rules_topic_subscriber_runnable,"slo_rules_topic_subscriber_thread",true,associated_detector_subcomponent);
CharacterizedThread.create_new_thread(slo_rules_topic_subscriber_runnable,"slo_rules_topic_subscriber_thread",true,associated_detector_subcomponent, CharacterizedThread.CharacterizedThreadType.persistent_running_detector_thread);
//Implementation of 'Lost edge device' thread
CharacterizedThread.create_new_thread(device_lost_topic_subscriber_runnable,"device_lost_topic_subscriber_thread",true,associated_detector_subcomponent);
Runnable device_lost_topic_subscriber_runnable = () -> {
boolean did_not_finish_execution_gracefully = true;
while (did_not_finish_execution_gracefully) {
int exit_status = device_lost_subscriber.subscribe(device_lost_subscriber_function, associated_detector_subcomponent.stop_signal); //This subscriber should not be immune to stop signals, else there would be new AtomicBoolean(false)
if (exit_status!=0) {
Logger.getGlobal().log(info_logging_level, "A device used by the platform was lost, will therefore trigger a reconfiguration");
try {
Thread.sleep(10000);
} catch (InterruptedException i) {
Logger.getGlobal().log(info_logging_level, "Sleep was interrupted, will immediately try to connect to the broker");
}
}else{
did_not_finish_execution_gracefully = false;
}
}
associated_detector_subcomponent.getSubcomponent_state().slo_bound_running_threads.remove(Thread.currentThread().getName().split(NAME_SEPARATOR)[0]);
};
CharacterizedThread.create_new_thread(device_lost_topic_subscriber_runnable,"device_lost_topic_subscriber_thread",true,associated_detector_subcomponent, CharacterizedThread.CharacterizedThreadType.slo_bound_running_thread);
if (self_publish_rule_file) {

View File

@ -12,9 +12,10 @@ import static utilities.OperationalModeUtils.get_director_subscription_topics;
public class DirectorSubcomponent extends SLOViolationDetectorSubcomponent {
public HashMap<String,Thread> persistent_running_director_threads = new HashMap<>();
public Connector subscribing_connector;
Integer id = 1;
private Integer id = 1;
public static HashMap<String,DirectorSubcomponent> director_subcomponents = new HashMap<>();
private static DirectorSubcomponent master_director;
private String director_name;
public static DirectorSubcomponent getMaster_director() {
return master_director;
@ -27,9 +28,10 @@ public class DirectorSubcomponent extends SLOViolationDetectorSubcomponent {
public DirectorSubcomponent(){
super.thread_type = CharacterizedThread.CharacterizedThreadType.persistent_running_director_thread;
create_director_topic_subscribers();
director_subcomponents.put(String.valueOf(id),this);
id++;
director_name = "director_"+id;
director_subcomponents.put(director_name,this);
master_director = this;
id++;
}
private void create_director_topic_subscribers(){
@ -42,4 +44,9 @@ public class DirectorSubcomponent extends SLOViolationDetectorSubcomponent {
}
//subscribing_connector = new Connector("slovid_director",)
}
@Override
public String get_name() {
return director_name;
}
}

View File

@ -6,19 +6,18 @@ import slo_violation_detector_engine.detector.DetectorSubcomponentUtilities;
import utility_beans.BrokerPublisher;
import org.json.simple.JSONObject;
import slo_rule_modelling.SLORule;
import utility_beans.BrokerSubscriber;
import utility_beans.CharacterizedThread;
import java.sql.Timestamp;
import java.time.Clock;
import java.util.Date;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
import static configuration.Constants.*;
import static java.lang.Thread.sleep;
import static slo_rule_modelling.SLORule.process_rule_value;
import static slo_violation_detector_engine.detector.DetectorSubcomponent.*;
import static slo_violation_detector_engine.generic.SLOViolationDetectorStateUtils.*;
import static slo_violation_detector_engine.detector.DetectorSubcomponentUtilities.*;
import static utilities.DebugDataSubscription.*;
@ -35,6 +34,7 @@ public class Runnables {
try {
synchronized (detector.HAS_MESSAGE_ARRIVED.get_synchronized_boolean(debug_data_trigger_topic_name)) {
//if (Main.HAS_MESSAGE_ARRIVED.get_synchronized_boolean(debug_data_topic_name).getValue())
debug_data_subscriber = new BrokerSubscriber(debug_data_trigger_topic_name, detector.getBrokerSubscriptionDetails().getBroker_ip(),detector.getBrokerSubscriptionDetails().getBroker_username(),detector.getBrokerSubscriptionDetails().getBroker_password(), amq_library_configuration_location);
debug_data_subscriber.subscribe(debug_data_generation, detector.stop_signal);
}
if (Thread.interrupted()) {
@ -52,18 +52,6 @@ public class Runnables {
}
}
public static Runnable device_lost_topic_subscriber_runnable = () -> {
while (true) {
device_lost_subscriber.subscribe(device_lost_subscriber_function, new AtomicBoolean(false)); //This subscriber should not be immune to stop signals
Logger.getGlobal().log(info_logging_level,"A device used by the platform was lost, will therefore trigger a reconfiguration");
try {
Thread.sleep(10000);
}catch (InterruptedException i){
Logger.getGlobal().log(info_logging_level,"Sleep was interrupted, will immediately try to connect to the broker");
}
}
};
public static class SLODetectionEngineRunnable implements Runnable {
private DetectorSubcomponent detector;
@ -170,8 +158,9 @@ public class Runnables {
Logger.getGlobal().log(severe_logging_level, "Severity calculation thread for epoch time " + targeted_prediction_time + " interrupted, stopping...");
return;
}
detector.getSubcomponent_state().slo_bound_running_threads.remove(Thread.currentThread().getName().split(NAME_SEPARATOR)[0]);
};
CharacterizedThread.create_new_thread(internal_severity_calculation_runnable, "internal_severity_calculation_thread_" + targeted_prediction_time, true,detector);
CharacterizedThread.create_new_thread(internal_severity_calculation_runnable, "internal_severity_calculation_thread_" + targeted_prediction_time, true,detector, CharacterizedThread.CharacterizedThreadType.slo_bound_running_thread);
} catch (NoSuchElementException n) {
Logger.getGlobal().log(warning_logging_level, "Could not calculate severity as a value was missing...");
continue;

View File

@ -4,5 +4,5 @@ import utility_beans.CharacterizedThread;
public abstract class SLOViolationDetectorSubcomponent {
public CharacterizedThread.CharacterizedThreadType thread_type;
public abstract String get_name();
}

View File

@ -4,20 +4,16 @@ package utilities;
//import eu.melodic.event.brokerclient.BrokerSubscriber;
import slo_violation_detector_engine.generic.Runnables;
import slo_violation_detector_engine.detector.DetectorSubcomponent;
import utility_beans.BrokerSubscriber;
import utility_beans.*;
import org.apache.commons.collections4.queue.CircularFifoQueue;
import slo_rule_modelling.SLOSubRule;
import utility_beans.BrokerPublisher;
import utility_beans.CharacterizedThread;
import utility_beans.RealtimeMonitoringAttribute;
import java.util.ArrayList;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.logging.Logger;
import static configuration.Constants.amq_library_configuration_location;
import static configuration.Constants.info_logging_level;
import static configuration.Constants.*;
import static slo_violation_detector_engine.detector.DetectorSubcomponent.detector_subcomponents;
import static slo_violation_detector_engine.director.DirectorSubcomponent.director_subcomponents;
import static utility_beans.RealtimeMonitoringAttribute.get_metric_value;
@ -30,7 +26,7 @@ public class DebugDataSubscription {
public static String debug_data_trigger_topic_name = "sloviolationdetector.debug";
public static String debug_data_output_topic_name = "sloviolationdetector.debug_output";
private static String broker_username,broker_password,broker_ip_address;
public static BiFunction <String,String,String> debug_data_generation = (topic, message) ->{
public static BiFunction <BrokerSubscriptionDetails,String,String> debug_data_generation = (broker_subscription_details, message) ->{
String output_debug_data = "";
StringBuilder intermediate_debug_string = new StringBuilder();
@ -132,14 +128,22 @@ public class DebugDataSubscription {
output_debug_data = output_debug_data+intermediate_debug_string;
Logger.getGlobal().log(info_logging_level,"Debug data generated:\n"+output_debug_data);
BrokerPublisher publisher = new BrokerPublisher(debug_data_output_topic_name, broker_ip_address, broker_username, broker_password, amq_library_configuration_location);
//Only try to publish data if details of the broker subscription are known
if (!broker_subscription_details.getBroker_ip().equals(EMPTY)&&
!broker_subscription_details.getBroker_password().equals(EMPTY)&&
!broker_subscription_details.getBroker_username().equals(EMPTY)){
BrokerPublisher publisher = new BrokerPublisher(debug_data_output_topic_name, broker_subscription_details.getBroker_ip(),broker_subscription_details.getBroker_username(),broker_subscription_details.getBroker_password(), amq_library_configuration_location);
publisher.publish(output_debug_data);
}
return output_debug_data;
};
public static BrokerSubscriber debug_data_subscriber = new BrokerSubscriber(debug_data_trigger_topic_name, broker_ip_address, broker_username, broker_password, amq_library_configuration_location);
public static BrokerSubscriber debug_data_subscriber;
public static void initiate(String broker_ip_address, String broker_username, String broker_password, DetectorSubcomponent detector) {
CharacterizedThread.create_new_thread(new Runnables.DebugDataRunnable(detector),"debug_data_subscription_thread_" + debug_data_trigger_topic_name,true,detector);
CharacterizedThread.create_new_thread(new Runnables.DebugDataRunnable(detector),"debug_data_subscription_thread_" + debug_data_trigger_topic_name,true,detector, CharacterizedThread.CharacterizedThreadType.persistent_running_detector_thread);
}
}

View File

@ -123,17 +123,17 @@ public class SLOViolationCalculator {
double severity_sum;
if (rule_type.equals(SLOSubRule.RuleType.greater_than_rule)) {
severity_sum = (predictionAttribute.getDelta_for_greater_than_rule() * predictionAttribute.getProbability_confidence() * (100 - predictionAttribute.getNormalizedConfidenceIntervalWidth() / 100)) / (100 * 100 * 100); //dividing by 10000 to normalize;
severity_sum = (predictionAttribute.getDelta_for_greater_than_rule() * predictionAttribute.getProbability_confidence() * (100 - predictionAttribute.getNormalizedConfidenceIntervalWidth() / 100)) / (100 * 100); //dividing by 10000 to normalize;
Logger.getGlobal().log(info_logging_level, "The prconf-delta attribute severity for a greater-than rule related to attribute " + predictionAttribute.getName() + " based on a (prconf,delta,confidence_interval) triplet of (" + predictionAttribute.getProbability_confidence() + "," + predictionAttribute.getDelta_for_greater_than_rule() + "," + predictionAttribute.getConfidence_interval_width() + ") is " + severity_sum);
}else if (rule_type.equals(SLOSubRule.RuleType.less_than_rule)){
severity_sum = (predictionAttribute.getDelta_for_less_than_rule() * predictionAttribute.getProbability_confidence() * (100 - predictionAttribute.getNormalizedConfidenceIntervalWidth() / 100)) / (100 * 100 * 100); //dividing by 10000 to normalize;
severity_sum = (predictionAttribute.getDelta_for_less_than_rule() * predictionAttribute.getProbability_confidence() * (100 - predictionAttribute.getNormalizedConfidenceIntervalWidth() / 100)) / (100 * 100); //dividing by 10000 to normalize;
Logger.getGlobal().log(info_logging_level, "The prconf-delta attribute severity for a less-than rule related to attribute " + predictionAttribute.getName() + " based on a (prconf,delta,confidence_interval) triplet of (" + predictionAttribute.getProbability_confidence() + "," + predictionAttribute.getDelta_for_less_than_rule() + "," + predictionAttribute.getConfidence_interval_width() + ") is " + severity_sum);
}else if (rule_type.equals(SLOSubRule.RuleType.equal_rule)){
double greater_than_severity_sum = (predictionAttribute.getDelta_for_greater_than_rule() * predictionAttribute.getProbability_confidence() * (100 - predictionAttribute.getNormalizedConfidenceIntervalWidth() / 100)) / (100 * 100 * 100); //dividing by 10000 to normalize;
double greater_than_severity_sum = (predictionAttribute.getDelta_for_greater_than_rule() * predictionAttribute.getProbability_confidence() * (100 - predictionAttribute.getNormalizedConfidenceIntervalWidth() / 100)) / (100 * 100); //dividing by 10000 to normalize;
Logger.getGlobal().log(info_logging_level, "The prconf-delta attribute severity for a greater-than rule related to attribute " + predictionAttribute.getName() + " based on a (prconf,delta,confidence_interval) triplet of (" + predictionAttribute.getProbability_confidence() + "," + predictionAttribute.getDelta_for_greater_than_rule() + "," + predictionAttribute.getConfidence_interval_width() + ") is " + greater_than_severity_sum);
double less_than_severity_sum = (predictionAttribute.getDelta_for_less_than_rule() * predictionAttribute.getProbability_confidence() * (100 - predictionAttribute.getNormalizedConfidenceIntervalWidth() / 100)) / (100 * 100 * 100); //dividing by 10000 to normalize;
double less_than_severity_sum = (predictionAttribute.getDelta_for_less_than_rule() * predictionAttribute.getProbability_confidence() * (100 - predictionAttribute.getNormalizedConfidenceIntervalWidth() / 100)) / (100 * 100); //dividing by 10000 to normalize;
Logger.getGlobal().log(info_logging_level, "The prconf-delta attribute severity for a less-than rule related to attribute " + predictionAttribute.getName() + " based on a (prconf,delta,confidence_interval) triplet of (" + predictionAttribute.getProbability_confidence() + "," + predictionAttribute.getDelta_for_less_than_rule() + "," + predictionAttribute.getConfidence_interval_width() + ") is " + less_than_severity_sum);
severity_sum = Math.max(less_than_severity_sum,greater_than_severity_sum);

View File

@ -8,53 +8,68 @@ import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import java.lang.reflect.Array;
import java.util.*;
import java.util.logging.Level;
import java.util.logging.Logger;
import static configuration.Constants.*;
import static java.lang.Thread.sleep;
import static java.util.logging.Level.INFO;
import static utilities.DebugDataSubscription.debug_data_output_topic_name;
public class BrokerPublisher {
private static HashMap<String, HashSet<String>> broker_and_topics_to_publish_to = new HashMap<>();
private Publisher private_publisher_instance;
private ArrayList<Publisher> publishers = new ArrayList<>();
private Connector active_connector;
private String topic;
private String broker_ip;
public BrokerPublisher(String topic, String brokerIpUrl, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation) {
public BrokerPublisher(String topic, String broker_ip, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation) {
boolean able_to_initialize_BrokerPublisher = topic!=null && broker_ip!=null && brokerUsername!=null && brokerPassword!=null && !topic.equals(EMPTY) && !broker_ip.equals(EMPTY) && !brokerUsername.equals(EMPTY) && !brokerPassword.equals(EMPTY);
if (!able_to_initialize_BrokerPublisher){
return;
}
boolean publisher_configuration_changed;
if (!broker_and_topics_to_publish_to.containsKey(brokerIpUrl)){
if (!broker_and_topics_to_publish_to.containsKey(broker_ip)){
HashSet<String> topics_to_publish_to = new HashSet<>();
topics_to_publish_to.add(debug_data_output_topic_name);
topics_to_publish_to.add(topic_for_severity_announcement);
topics_to_publish_to.add(slo_rules_topic);
//topics_to_publish_to.add(slo_rules_topic);
topics_to_publish_to.add(topic);
broker_and_topics_to_publish_to.put(brokerIpUrl,new HashSet<>());
broker_and_topics_to_publish_to.put(broker_ip,topics_to_publish_to);
publisher_configuration_changed = true;
}else{
if (!broker_and_topics_to_publish_to.get(brokerIpUrl).contains(topic)){
broker_and_topics_to_publish_to.get(brokerIpUrl).add(topic);
if (!broker_and_topics_to_publish_to.get(broker_ip).contains(topic)){
broker_and_topics_to_publish_to.get(broker_ip).add(topic);
publisher_configuration_changed = true;
}
else{
publisher_configuration_changed = false;
}
}
if (publisher_configuration_changed){
for (String broker_ip : broker_and_topics_to_publish_to.keySet()){
ArrayList<Publisher> publishers = new ArrayList<>();
// for (String current_broker_ip : broker_and_topics_to_publish_to.keySet()){
publishers.clear();
for (String broker_topic : broker_and_topics_to_publish_to.get(broker_ip)){
Publisher current_publisher = new Publisher(slovid_publisher_key,broker_topic,true);
publishers.add(current_publisher);
if (broker_ip.equals(brokerIpUrl) && broker_topic.equals(topic)){
this.private_publisher_instance = current_publisher;
//ArrayList<Publisher> publishers = new ArrayList<>();
Publisher publisher = new Publisher(slovid_publisher_key + NAME_SEPARATOR +broker_topic, broker_topic, true, true);
publishers.add(publisher);
if (broker_topic.equals(topic)){
this.private_publisher_instance = publishers.get(publishers.size()-1);
this.topic = broker_topic;
this.broker_ip = broker_ip;
}
}
Connector connector = new Connector("slovid",
new ConnectorHandler() {
}, publishers
//CustomConnectorHandler custom_handler = new CustomConnectorHandler();
active_connector = new Connector("slovid"
, new ConnectorHandler() {}
, publishers
, List.of(),
false,
false,
@ -62,11 +77,15 @@ public class BrokerPublisher {
broker_ip,
5672,
brokerUsername,
brokerPassword
brokerPassword,
60,
EMPTY
)
);
connector.start();
}
active_connector.start();
//Logger.getGlobal().log(INFO,"Sending from EXTERIOR");
//private_publisher_instance.send(new JSONObject());
}
}
@ -77,8 +96,12 @@ public class BrokerPublisher {
try{
json_object = (JSONObject) parser.parse(json_string_content);
}catch (ParseException p){
Logger.getGlobal().log(Level.SEVERE,"Could not parse the string content");
Logger.getGlobal().log(Level.WARNING,"Could not parse the string content to be published to the broker as json");
}
if (private_publisher_instance!=null) {
private_publisher_instance.send(json_object);
}else{
Logger.getGlobal().log(Level.SEVERE,"Could not send message to AMQP broker, as the broker ip to be used has not been specified");
}
}
}

View File

@ -1,29 +1,27 @@
package utility_beans;
import eu.nebulouscloud.exn.Connector;
import eu.nebulouscloud.exn.core.Consumer;
import eu.nebulouscloud.exn.core.Context;
import eu.nebulouscloud.exn.core.Handler;
import eu.nebulouscloud.exn.handlers.ConnectorHandler;
import eu.nebulouscloud.exn.settings.StaticExnConfig;
import org.apache.qpid.protonj2.client.Message;
import org.json.simple.JSONValue;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.logging.Level;
import java.util.logging.Logger;
import static configuration.Constants.slovid_publisher_key;
import static configuration.Constants.*;
import static java.util.logging.Level.INFO;
public class BrokerSubscriber {
private static class MessageProcessingHandler extends Handler{
private static final BiFunction temporary_function = (Object o, Object o2) -> {
//System.out.println("");
Logger.getGlobal().log(Level.INFO,"REPLACE_TEMPORARY_HANDLING_FUNCTIONALITY");
Logger.getGlobal().log(INFO,"REPLACE_TEMPORARY_HANDLING_FUNCTIONALITY");
return "IN_PROCESSING";
};
private BiFunction<String,String,String> processing_function;
@ -51,7 +49,19 @@ public class BrokerSubscriber {
private String broker_ip;
private String brokerUsername;
private String brokerPassword;
public BrokerSubscriber(String topic, String broker_ip, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation) {
public BrokerSubscriber(String topic, String broker_ip, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation){
boolean able_to_initialize_BrokerSubscriber = topic!=null && broker_ip!=null && brokerUsername!=null && brokerPassword!=null && !topic.equals(EMPTY) && !broker_ip.equals(EMPTY) && !brokerUsername.equals(EMPTY) && !brokerPassword.equals(EMPTY);
if (!able_to_initialize_BrokerSubscriber){
try {
throw new Exception("Unable to initialize Subscriber");
} catch (Exception e) {
String message = "Topic is "+topic+" broker ip is "+broker_ip+" broker username/pass are "+brokerUsername+","+brokerPassword;
Logger.getGlobal().log(INFO,message);
throw new RuntimeException(e);
}
}
boolean subscriber_configuration_changed;
if (!broker_and_topics_to_subscribe_to.containsKey(broker_ip)){
HashSet<String> topics_to_subscribe_to = new HashSet<>();
@ -74,7 +84,7 @@ public class BrokerSubscriber {
}
}
if (subscriber_configuration_changed){
Consumer current_consumer = new Consumer(topic, topic, new MessageProcessingHandler());
Consumer current_consumer = new Consumer(topic, topic, new MessageProcessingHandler(),true,true);
active_consumers_per_topic_per_broker_ip.get(broker_ip).put(topic,current_consumer);
this.topic = topic;
@ -105,7 +115,9 @@ public class BrokerSubscriber {
broker_ip,
5672,
brokerUsername,
brokerPassword
brokerPassword,
60,
EMPTY
)
);
extended_connector.start();
@ -119,47 +131,55 @@ public class BrokerSubscriber {
}
}
public void subscribe(BiFunction<String, String, String> function, AtomicBoolean stop_signal) {
Logger.getGlobal().log(Level.INFO,"ESTABLISHING SUBSCRIPTION");
public int subscribe(BiFunction function, AtomicBoolean stop_signal) {
int exit_status = -1;
Logger.getGlobal().log(INFO,"ESTABLISHING SUBSCRIPTION for "+topic);
//First remove any leftover consumer
if (active_consumers_per_topic_per_broker_ip.containsKey(broker_ip)) {
active_consumers_per_topic_per_broker_ip.get(broker_ip).remove(topic);
remove_topic_from_broker_connector(topic);
}else{
active_consumers_per_topic_per_broker_ip.put(broker_ip,new HashMap<>());
}
//Then add the new consumer
Consumer new_consumer = new Consumer(topic,topic,new MessageProcessingHandler(function));
Consumer new_consumer = new Consumer(topic,topic,new MessageProcessingHandler(function),true,true);
new_consumer.setProperty("topic",topic);
active_consumers_per_topic_per_broker_ip.get(broker_ip).put(topic,new_consumer);
add_topic_consumer_to_broker_connector(new_consumer);
Logger.getGlobal().log(Level.INFO,"ESTABLISHED SUBSCRIPTION to topic "+topic);
Logger.getGlobal().log(INFO,"ESTABLISHED SUBSCRIPTION to topic "+topic);
synchronized (stop_signal){
while (!stop_signal.get()){
try{
stop_signal.wait();
}catch (Exception e){
e.printStackTrace();
Logger.getGlobal().log(Level.WARNING,e.toString()+" in thread "+Thread.currentThread().getName());
break;
}
}
Logger.getGlobal().log(Level.INFO,"Stopping subscription for broker "+broker_ip+" and topic "+topic);
Logger.getGlobal().log(INFO,"Stopping subscription for broker "+broker_ip+" and topic "+topic + "at thread "+Thread.currentThread().getName());
stop_signal.set(false);
}
active_consumers_per_topic_per_broker_ip.get(broker_ip).remove(topic);
remove_topic_from_broker_connector(topic);
exit_status=0;
return exit_status;
}
public enum EventFields{
;
public enum PredictionMetricEventFields {timestamp, prediction_time, probability, metric_value, confidence_interval}
public enum PredictionMetricEventFields {timestamp, predictionTime, probability, metricValue, confidence_interval}
}
public static class TopicNames{
public static String realtime_metric_values_topic(String metric) {
return null;
return realtime_metrics_topic+metric;
}
public static String final_metric_predictions_topic(String metric) {
return null;
return final_metric_prediction_topic+metric;
}
}
}

View File

@ -0,0 +1,47 @@
package utility_beans;
import static configuration.Constants.EMPTY;
public class BrokerSubscriptionDetails {
String broker_username = "admin";
String broker_password = "admin";
String broker_ip = "localhost";
public BrokerSubscriptionDetails(String broker_ip, String broker_username, String broker_password) {
this.broker_ip = broker_ip;
this.broker_username = broker_username;
this.broker_password = broker_password;
}
public BrokerSubscriptionDetails(boolean fake_broker_subscription) {
if (fake_broker_subscription) {
this.broker_username = EMPTY;
this.broker_password = EMPTY;
this.broker_ip = EMPTY;
}
}
public String getBroker_username() {
return broker_username;
}
public void setBroker_username(String broker_username) {
this.broker_username = broker_username;
}
public String getBroker_password() {
return broker_password;
}
public void setBroker_password(String broker_password) {
this.broker_password = broker_password;
}
public String getBroker_ip() {
return broker_ip;
}
public void setBroker_ip(String broker_ip) {
this.broker_ip = broker_ip;
}
}

View File

@ -7,6 +7,7 @@ import slo_violation_detector_engine.generic.SLOViolationDetectorSubcomponent;
import java.util.logging.Level;
import java.util.logging.Logger;
import static configuration.Constants.NAME_SEPARATOR;
import static utility_beans.CharacterizedThread.CharacterizedThreadType.*;
public class CharacterizedThread{
@ -18,19 +19,19 @@ public class CharacterizedThread{
attached,detached
}
public static Thread create_new_thread(Runnable runnable, String thread_name, boolean start_thread_now, SLOViolationDetectorSubcomponent subcomponent){
public static Thread create_new_thread(Runnable runnable, String thread_name, boolean start_thread_now, SLOViolationDetectorSubcomponent subcomponent,CharacterizedThreadType thread_type){
Thread thread = new Thread(runnable);
thread.setName(thread_name);
if (subcomponent.thread_type.equals(slo_bound_running_thread)){
thread.setName(thread_name+ NAME_SEPARATOR +subcomponent.get_name());
if (thread_type.equals(slo_bound_running_thread)){
try {
((DetectorSubcomponent)subcomponent).getSubcomponent_state().slo_bound_running_threads.put(thread_name, thread);
}catch (NullPointerException n){
n.printStackTrace();
Logger.getGlobal().log(Level.SEVERE,"Although the thread type for thread "+thread_name+" was declared to be an slo_bound_running_thread, no detector subcomponent was related to it");
}
}else if (subcomponent.thread_type.equals(persistent_running_director_thread)){
}else if (thread_type.equals(persistent_running_director_thread)){
((DirectorSubcomponent) subcomponent).persistent_running_director_threads.put(thread_name,thread);
}else if (subcomponent.thread_type.equals(persistent_running_detector_thread)){
}else if (thread_type.equals(persistent_running_detector_thread)){
((DetectorSubcomponent)subcomponent).getSubcomponent_state().persistent_running_detector_threads.put(thread_name, thread);
}else{
Logger.getGlobal().log(Level.WARNING,"Undefined type of thread for thread with name: "+thread_name);

View File

@ -39,7 +39,9 @@ public class CustomDataPublisher {
broker_ip,
5672,
brokerUsername,
brokerPassword
brokerPassword,
60,
EMPTY
)
);
connector.start();

View File

@ -144,13 +144,13 @@ public class UnboundedMonitoringAttributeTests {
String predicted_attribute_name = topic.replaceFirst("prediction\\.",EMPTY);
HashMap<Integer, HashMap<Long, PredictedMonitoringAttribute>> predicted_attributes = getPredicted_monitoring_attributes();
try {
double forecasted_value = ((Number)((JSONObject)new JSONParser().parse(message)).get(EventFields.PredictionMetricEventFields.metric_value)).doubleValue();
double forecasted_value = ((Number)((JSONObject)new JSONParser().parse(message)).get(EventFields.PredictionMetricEventFields.metricValue.name())).doubleValue();
double probability_confidence = 100*((Number)((JSONObject)new JSONParser().parse(message)).get(EventFields.PredictionMetricEventFields.probability)).doubleValue();
//double confidence_interval = ((Number)((JSONObject)new JSONParser().parse(message)).get(EventFields.PredictionMetricEventFields.confidence_interval)).doubleValue();
JSONArray json_array_confidence_interval = ((JSONArray)((JSONObject)new JSONParser().parse(message)).get(EventFields.PredictionMetricEventFields.confidence_interval));
double confidence_interval = ((Number)json_array_confidence_interval.get(1)).doubleValue() - ((Number)json_array_confidence_interval.get(0)).doubleValue();
long timestamp = ((Number)((JSONObject)new JSONParser().parse(message)).get(EventFields.PredictionMetricEventFields.timestamp)).longValue();
long targeted_prediction_time = ((Number)((JSONObject)new JSONParser().parse(message)).get(EventFields.PredictionMetricEventFields.prediction_time)).longValue();
long targeted_prediction_time = ((Number)((JSONObject)new JSONParser().parse(message)).get(EventFields.PredictionMetricEventFields.predictionTime.name())).longValue();
Logger.getGlobal().log(info_logging_level,"RECEIVED message with predicted value for "+predicted_attribute_name+" equal to "+ forecasted_value);
synchronized (detector.can_modify_slo_rules) {
@ -260,7 +260,7 @@ public class UnboundedMonitoringAttributeTests {
forecasted_metric_json_object.put("metricValue", forecasted_metric_value);
forecasted_metric_json_object.put("timestamp",System.currentTimeMillis());
forecasted_metric_json_object.put("probability",probability);
forecasted_metric_json_object.put(EventFields.PredictionMetricEventFields.prediction_time,targeted_prediction_time);
forecasted_metric_json_object.put(EventFields.PredictionMetricEventFields.predictionTime.name(),targeted_prediction_time);
//((System.currentTimeMillis()/1000)%60)*60000+1); //The prediction supposedly reflects the metric values at the next minute
JSONArray confidence_interval_list = new JSONArray();
confidence_interval_list.add((forecasted_metric_value-confidence_interval/2));