Code rebasing
Rebasing of the code of the component, including major changes Changes in the naming of the topics used for the communication of the component Better information propagation during the creation of a Broker subscriber processing function Better handling of upper and lower metric (attribute) bounds Acquisition of broker credential information from the user Improvement of number handling within metrics (allow a metric to have integer or real values) Improvement of the logic related to the initiation of a new SLO violation detection engine Transferral of generic topic subscriptions (application-independent logic) to the code of the Director Subcomponent Improvements to the CustomDataPublisher Better handling of paths for the properties files necessary for the operation of the component Improvement on the handling of infinity Implementation of stopping functionality for AMQP connectors through ExtendedConnector Changes by specifying explicitly the names of topics Introduction of GUI subscriber utility and improvements on GUI publisher Improvements in Debug data generated Changes in REST calls to use application name Improvements in the unregistration of consumers and stopping of an ExtendedConnector Improvements in the initialization of the component Addition of capability to export executable jar Fixed a case when the active connector could not be stopped because it was not non-null. Removed unecessary code, and jar-generating code. Allow setting custom port, and set the correct host to contact the activemq broker Change-Id: I94a6fdb4612de192c24511445f1236cdce94b402
This commit is contained in:
parent
ac4d0744b5
commit
074b1d4a7b
@ -13,6 +13,20 @@
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-maven-plugin</artifactId>
|
||||
</plugin>
|
||||
<!-- Uncomment to allow the creation of a Java Sender
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-jar-plugin</artifactId>
|
||||
<configuration>
|
||||
<archive>
|
||||
<manifest>
|
||||
<addClasspath>true</addClasspath>
|
||||
<mainClass>utility_beans.broker_communication.CustomDataSubscriber</mainClass>
|
||||
</manifest>
|
||||
</archive>
|
||||
</configuration>
|
||||
</plugin>
|
||||
-->
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
@ -88,11 +102,23 @@
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-tomcat</artifactId>
|
||||
</dependency>
|
||||
<!-- <dependency>
|
||||
<groupId>gr.ntua.imu.morphemic</groupId>
|
||||
<artifactId>amq-message-java-library</artifactId>
|
||||
<version>4.0.0-SNAPSHOT</version>
|
||||
</dependency> !-->
|
||||
|
||||
<!-- https://mvnrepository.com/artifact/org.apache.tomcat.embed/tomcat-embed-jasper -->
|
||||
<dependency>
|
||||
<groupId>org.apache.tomcat.embed</groupId>
|
||||
<artifactId>tomcat-embed-jasper</artifactId>
|
||||
<version>11.0.0-M16</version>
|
||||
</dependency>
|
||||
|
||||
|
||||
<!-- https://mvnrepository.com/artifact/jakarta.servlet.jsp.jstl/jakarta.servlet.jsp.jstl-api -->
|
||||
<dependency>
|
||||
<groupId>jakarta.servlet.jsp.jstl</groupId>
|
||||
<artifactId>jakarta.servlet.jsp.jstl-api</artifactId>
|
||||
<version>3.0.0</version>
|
||||
</dependency>
|
||||
|
||||
|
||||
<dependency>
|
||||
<groupId>com.googlecode.json-simple</groupId>
|
||||
<artifactId>json-simple</artifactId>
|
||||
@ -132,7 +158,6 @@
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<version>4.13.1</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
@ -23,21 +23,19 @@ public class Constants {
|
||||
public static int maximum_acceptable_forward_predictions;
|
||||
public static String [] logic_operators = {"and","or"};
|
||||
public static final String default_application_name = "default_application";
|
||||
public static final String slovid_publisher_key = "slovid_publisher";
|
||||
public static final String slovid_subscriber_key = "slovid_publisher";
|
||||
public static URI base_project_path;
|
||||
public static String configuration_file_location = "slo-violation-detector/src/main/resources/config/eu.nebulous.slo_violation_detector.properties";
|
||||
public static String amq_library_configuration_location = "slo-violation-detector/src/main/resources/config/eu.melodic.event.brokerclient.properties";
|
||||
public static String topic_for_severity_announcement = "eu.nebulouscloud.monitoring.slo.severity_value";
|
||||
public static String topic_for_lost_device_announcement = "eu.nebulouscloud.device_lost";
|
||||
public static String topic_for_lost_device_announcement = "eu.nebulouscloud.monitoring.device_lost";
|
||||
public static String slo_rules_topic = "eu.nebulouscloud.monitoring.slo.new";
|
||||
public static String metric_list_topic = "eu.nebulouscloud.monitoring.metric_list";
|
||||
public static String topic_prefix_realtime_metrics = "eu.nebulouscloud.monitoring.realtime.";
|
||||
public static String topic_prefix_final_predicted_metrics = "eu.nebulouscloud.monitoring.predicted.";
|
||||
public static String nebulous_components_application = "nebulous_components_application";
|
||||
public static double slo_violation_probability_threshold = 0.5; //The threshold over which the probability of a predicted slo violation should be to have a violation detection
|
||||
public static int kept_values_per_metric = 5; //Default to be overriden from the configuration file. This indicates how many metric values are kept to calculate the "previous" metric value during the rate of change calculation
|
||||
public static String roc_calculation_mode = "prototype";
|
||||
public static boolean self_publish_rule_file = false; //default value to be overriden
|
||||
public static boolean single_slo_rule_active = true; //default value to be overriden
|
||||
public static double roc_limit = 1;
|
||||
public static double epsilon = 0.00000000001;
|
||||
|
@ -12,15 +12,16 @@ package metric_retrieval;
|
||||
//import eu.melodic.event.brokerclient.templates.EventFields;
|
||||
//import eu.melodic.event.brokerclient.templates.TopicNames;
|
||||
import slo_violation_detector_engine.detector.DetectorSubcomponent;
|
||||
import utility_beans.*;
|
||||
import utility_beans.BrokerSubscriber.EventFields;
|
||||
import utility_beans.BrokerSubscriber.TopicNames;
|
||||
import utility_beans.broker_communication.BrokerSubscriber;
|
||||
import org.json.simple.JSONArray;
|
||||
import org.json.simple.JSONObject;
|
||||
import org.json.simple.parser.JSONParser;
|
||||
import org.json.simple.parser.ParseException;
|
||||
import slo_rule_modelling.SLORule;
|
||||
import slo_rule_modelling.SLOSubRule;
|
||||
import utility_beans.broker_communication.BrokerSubscriptionDetails;
|
||||
import utility_beans.generic_component_functionality.CharacterizedThread;
|
||||
import utility_beans.monitoring.PredictedMonitoringAttribute;
|
||||
|
||||
import java.time.Clock;
|
||||
import java.util.HashMap;
|
||||
@ -29,19 +30,19 @@ import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import static configuration.Constants.*;
|
||||
import static utility_beans.PredictedMonitoringAttribute.getPredicted_monitoring_attributes;
|
||||
import static utility_beans.monitoring.PredictedMonitoringAttribute.getPredicted_monitoring_attributes;
|
||||
|
||||
public class AttributeSubscription {
|
||||
SLORule slo_rule;
|
||||
|
||||
public AttributeSubscription(SLORule slo_rule, String broker_ip_address, String broker_username, String broker_password){
|
||||
public AttributeSubscription(SLORule slo_rule, String broker_ip_address,int broker_port, String broker_username, String broker_password){
|
||||
this.slo_rule = slo_rule;
|
||||
DetectorSubcomponent detector = slo_rule.getAssociated_detector();
|
||||
for (String metric:slo_rule.get_monitoring_attributes()){
|
||||
|
||||
String realtime_metric_topic_name = TopicNames.realtime_metric_values_topic(metric);
|
||||
Logger.getGlobal().log(info_logging_level,"Starting realtime subscription at "+realtime_metric_topic_name);
|
||||
BrokerSubscriber subscriber = new BrokerSubscriber(realtime_metric_topic_name, broker_ip_address,broker_username,broker_password, amq_library_configuration_location,detector.get_application_name());
|
||||
String realtime_metric_topic_name = topic_prefix_realtime_metrics+metric;
|
||||
Logger.getGlobal().log(info_logging_level,"Starting realtime subscription at "+realtime_metric_topic_name + " for application "+detector.get_application_name());
|
||||
BrokerSubscriber subscriber = new BrokerSubscriber(realtime_metric_topic_name, broker_ip_address,broker_port,broker_username,broker_password, amq_library_configuration_location,detector.get_application_name());
|
||||
BiFunction<BrokerSubscriptionDetails,String,String> function = (broker_details, message) ->{
|
||||
synchronized (detector.getSubcomponent_state().getMonitoring_attributes().get(metric)) {
|
||||
try {
|
||||
@ -60,7 +61,7 @@ public class AttributeSubscription {
|
||||
};
|
||||
Runnable realtime_subscription_runnable = () -> {
|
||||
try {
|
||||
subscriber.subscribe(function, detector.stop_signal);
|
||||
subscriber.subscribe(function, detector.get_application_name(),detector.stop_signal);
|
||||
if(Thread.interrupted()){
|
||||
throw new InterruptedException();
|
||||
}
|
||||
@ -78,19 +79,19 @@ public class AttributeSubscription {
|
||||
|
||||
|
||||
|
||||
String forecasted_metric_topic_name = TopicNames.final_metric_predictions_topic(metric);
|
||||
String forecasted_metric_topic_name = topic_prefix_final_predicted_metrics+metric;
|
||||
Logger.getGlobal().log(info_logging_level,"Starting forecasted metric subscription at "+forecasted_metric_topic_name);
|
||||
BrokerSubscriber forecasted_subscriber = new BrokerSubscriber(forecasted_metric_topic_name, broker_ip_address,broker_username,broker_password, amq_library_configuration_location,detector.get_application_name());
|
||||
BrokerSubscriber forecasted_subscriber = new BrokerSubscriber(forecasted_metric_topic_name, broker_ip_address,broker_port,broker_username,broker_password, amq_library_configuration_location,detector.get_application_name());
|
||||
|
||||
BiFunction<BrokerSubscriptionDetails,String,String> forecasted_function = (broker_details,message) ->{
|
||||
String predicted_attribute_name = forecasted_metric_topic_name.replaceFirst("eu\\.nebulouscloud\\.monitoring\\.predicted\\.",EMPTY);
|
||||
HashMap<Integer, HashMap<Long,PredictedMonitoringAttribute>> predicted_attributes = getPredicted_monitoring_attributes();
|
||||
BiFunction<BrokerSubscriptionDetails,String,String> forecasted_function = (broker_details, message) ->{
|
||||
String predicted_attribute_name = forecasted_metric_topic_name.replaceFirst(topic_prefix_final_predicted_metrics,EMPTY);
|
||||
HashMap<Integer, HashMap<Long, PredictedMonitoringAttribute>> predicted_attributes = getPredicted_monitoring_attributes();
|
||||
try {
|
||||
JSONObject json_message = (JSONObject)(new JSONParser().parse(message));
|
||||
Logger.getGlobal().log(Level.INFO,"Getting information for "+EventFields.PredictionMetricEventFields.metricValue);
|
||||
double forecasted_value = ((Number)json_message.get(EventFields.PredictionMetricEventFields.metricValue.name())).doubleValue();
|
||||
double probability_confidence = 100*((Number)json_message.get(EventFields.PredictionMetricEventFields.probability.name())).doubleValue();
|
||||
JSONArray json_array_confidence_interval = (JSONArray)(json_message.get(EventFields.PredictionMetricEventFields.confidence_interval.name()));
|
||||
Logger.getGlobal().log(Level.INFO,"Getting information for metricValue");
|
||||
double forecasted_value = ((Number)json_message.get("metricValue")).doubleValue();
|
||||
double probability_confidence = 100*((Number)json_message.get("probability")).doubleValue();
|
||||
JSONArray json_array_confidence_interval = (JSONArray)(json_message.get("confidence_interval"));
|
||||
|
||||
double confidence_interval;
|
||||
try{
|
||||
@ -100,8 +101,8 @@ public class AttributeSubscription {
|
||||
c.printStackTrace();
|
||||
confidence_interval = Double.NEGATIVE_INFINITY;
|
||||
}
|
||||
long timestamp = ((Number)json_message.get(EventFields.PredictionMetricEventFields.timestamp.name())).longValue();
|
||||
long targeted_prediction_time = ((Number)json_message.get(EventFields.PredictionMetricEventFields.predictionTime.name())).longValue();
|
||||
long timestamp = ((Number)json_message.get("timestamp")).longValue();
|
||||
long targeted_prediction_time = ((Number)json_message.get("predictionTime")).longValue();
|
||||
Logger.getGlobal().log(info_logging_level,"RECEIVED message with predicted value for "+predicted_attribute_name+" equal to "+ forecasted_value);
|
||||
|
||||
|
||||
@ -178,7 +179,7 @@ public class AttributeSubscription {
|
||||
try {
|
||||
synchronized (detector.HAS_MESSAGE_ARRIVED.get_synchronized_boolean(forecasted_metric_topic_name)) {
|
||||
//if (Main.HAS_MESSAGE_ARRIVED.get_synchronized_boolean(forecasted_metric_topic_name).getValue())
|
||||
forecasted_subscriber.subscribe(forecasted_function,detector.stop_signal);
|
||||
forecasted_subscriber.subscribe(forecasted_function,detector.get_application_name(),detector.stop_signal);
|
||||
}
|
||||
if (Thread.interrupted()) {
|
||||
throw new InterruptedException();
|
||||
|
@ -2,22 +2,21 @@ package runtime;
|
||||
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
import slo_violation_detector_engine.detector.DetectorSubcomponent;
|
||||
import utility_beans.BrokerSubscriptionDetails;
|
||||
import utility_beans.broker_communication.BrokerSubscriptionDetails;
|
||||
|
||||
import static configuration.Constants.EMPTY;
|
||||
import static configuration.Constants.default_application_name;
|
||||
import static runtime.Main.detectors;
|
||||
import static slo_violation_detector_engine.detector.DetectorSubcomponent.detector_integer_id;
|
||||
import static slo_violation_detector_engine.detector.DetectorSubcomponent.detector_subcomponents;
|
||||
import static utilities.DebugDataSubscription.*;
|
||||
import static utility_beans.CharacterizedThread.CharacterizedThreadRunMode.detached;
|
||||
import static utility_beans.generic_component_functionality.CharacterizedThread.CharacterizedThreadRunMode.detached;
|
||||
@RestController
|
||||
@RequestMapping("/api")
|
||||
public class DetectorRequestMappings {
|
||||
|
||||
@RequestMapping("/add-new-detector")
|
||||
public static String start_new_detector_subcomponent() {
|
||||
detectors.put(default_application_name,new DetectorSubcomponent(default_application_name,detached));
|
||||
@RequestMapping("/add-new-detector/{application_name}")
|
||||
public static String start_new_detector_subcomponent(@PathVariable String application_name) {
|
||||
detectors.put(application_name,new DetectorSubcomponent(application_name,detached));
|
||||
return ("Spawned new SLO Detector subcomponent instance! Currently, there have been "+detector_integer_id+" detectors spawned");
|
||||
}
|
||||
|
||||
@ -27,9 +26,9 @@ public class DetectorRequestMappings {
|
||||
debug_data_generation.apply(new BrokerSubscriptionDetails(false),EMPTY);
|
||||
return "Debug data generation was successful";
|
||||
}
|
||||
@GetMapping("/component-statistics/detectors/{id}")
|
||||
public static String get_detector_subcomponent_statistics(@PathVariable String id) {
|
||||
String detector_name = "detector_"+id;
|
||||
@GetMapping("/component-statistics/detectors/{application_name}")
|
||||
public static String get_detector_subcomponent_statistics(@PathVariable String application_name) {
|
||||
String detector_name = "detector_"+application_name;
|
||||
debug_data_generation.apply(detector_subcomponents.get(detector_name).getBrokerSubscriptionDetails(debug_data_trigger_topic_name),EMPTY);
|
||||
return DetectorSubcomponent.get_detector_subcomponent_statistics();
|
||||
}
|
||||
|
@ -1,6 +1,5 @@
|
||||
package runtime;
|
||||
|
||||
import configuration.Constants;
|
||||
import org.json.simple.JSONArray;
|
||||
import org.json.simple.JSONObject;
|
||||
import org.json.simple.parser.JSONParser;
|
||||
@ -11,9 +10,9 @@ import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
import slo_violation_detector_engine.detector.DetectorSubcomponent;
|
||||
import slo_violation_detector_engine.director.DirectorSubcomponent;
|
||||
import utility_beans.BrokerSubscriptionDetails;
|
||||
import utility_beans.CharacterizedThread;
|
||||
import utility_beans.RealtimeMonitoringAttribute;
|
||||
import utility_beans.broker_communication.BrokerSubscriptionDetails;
|
||||
import utility_beans.generic_component_functionality.CharacterizedThread;
|
||||
import utility_beans.monitoring.RealtimeMonitoringAttribute;
|
||||
|
||||
import java.util.HashMap;
|
||||
|
||||
@ -21,21 +20,23 @@ import static configuration.Constants.*;
|
||||
import static org.springframework.http.MediaType.APPLICATION_JSON_VALUE;
|
||||
import static slo_violation_detector_engine.generic.ComponentState.*;
|
||||
|
||||
|
||||
@RestController
|
||||
@RequestMapping("/api")
|
||||
public class DirectorRequestMappings {
|
||||
@PostMapping(value = "/new-application-slo",
|
||||
consumes = APPLICATION_JSON_VALUE)
|
||||
public static String start_new_detector_subcomponent(@RequestBody String string_rule_representation){
|
||||
JSONObject rule_representation_json;
|
||||
/* JSONObject rule_representation_json;
|
||||
JSONParser json_parser = new JSONParser();
|
||||
String application_name;
|
||||
try {
|
||||
rule_representation_json = (JSONObject) json_parser.parse(string_rule_representation);
|
||||
application_name = (String) rule_representation_json.get("name");
|
||||
} catch (ParseException e) {
|
||||
return "Error in parsing the input string, the exception message follows:\n"+e;
|
||||
}
|
||||
BrokerSubscriptionDetails broker_details = new BrokerSubscriptionDetails(broker_ip,broker_username,broker_password,nebulous_components_application,slo_rules_topic);
|
||||
}*/
|
||||
BrokerSubscriptionDetails broker_details = new BrokerSubscriptionDetails(broker_ip,broker_username,broker_password,EMPTY,slo_rules_topic);
|
||||
DirectorSubcomponent.slo_rule_topic_subscriber_function.apply(broker_details,string_rule_representation);
|
||||
return ("New application was spawned");
|
||||
}
|
||||
|
@ -8,14 +8,11 @@
|
||||
|
||||
package runtime;
|
||||
|
||||
//import eu.melodic.event.brokerclient.BrokerPublisher;
|
||||
//import eu.melodic.event.brokerclient.BrokerSubscriber;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import slo_violation_detector_engine.detector.DetectorSubcomponent;
|
||||
import slo_violation_detector_engine.director.DirectorSubcomponent;
|
||||
import slo_violation_detector_engine.generic.ComponentState.*;
|
||||
import utility_beans.*;
|
||||
import utility_beans.generic_component_functionality.OperationalMode;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
@ -28,8 +25,7 @@ import static java.util.logging.Level.INFO;
|
||||
import static slo_violation_detector_engine.generic.ComponentState.*;
|
||||
import static utilities.OperationalModeUtils.getSLOViolationDetectionOperationalMode;
|
||||
import static slo_violation_detector_engine.generic.SLOViolationDetectorStateUtils.*;
|
||||
import static utilities.OperationalModeUtils.get_director_subscription_topics;
|
||||
import static utility_beans.CharacterizedThread.CharacterizedThreadRunMode.detached;
|
||||
import static utility_beans.generic_component_functionality.CharacterizedThread.CharacterizedThreadRunMode.detached;
|
||||
|
||||
|
||||
@SpringBootApplication
|
||||
@ -47,7 +43,6 @@ public class Main {
|
||||
// - The prediction confidence
|
||||
|
||||
//The above functionality is carried out by a subcomponent of the SLO Violation Detector which is the Detector. There is at least one Detector in each SLO Violation Detector, but there is also one Director responsible for guiding the Detector(s).
|
||||
|
||||
try {
|
||||
{
|
||||
InputStream inputStream;
|
||||
@ -55,19 +50,19 @@ public class Main {
|
||||
operational_mode = getSLOViolationDetectionOperationalMode("DIRECTOR");
|
||||
inputStream = getPreferencesFileInputStream(EMPTY);
|
||||
} else if (args.length == 1) {
|
||||
Logger.getGlobal().log(info_logging_level, "Operational mode has been manually specified");
|
||||
operational_mode = getSLOViolationDetectionOperationalMode(args[0]);
|
||||
inputStream = getPreferencesFileInputStream(EMPTY);
|
||||
Logger.getGlobal().log(info_logging_level, "Preferences file has been manually specified");
|
||||
operational_mode = getSLOViolationDetectionOperationalMode("DIRECTOR");
|
||||
inputStream = getPreferencesFileInputStream(args[0]);
|
||||
} else {
|
||||
Logger.getGlobal().log(info_logging_level, "Operational mode and preferences file has been manually specified");
|
||||
operational_mode = getSLOViolationDetectionOperationalMode(args[0]);
|
||||
inputStream = getPreferencesFileInputStream(args[1]);
|
||||
inputStream = getPreferencesFileInputStream(args[0]);
|
||||
operational_mode = getSLOViolationDetectionOperationalMode(args[1]);
|
||||
|
||||
}
|
||||
prop.load(inputStream);
|
||||
slo_rules_topic = prop.getProperty("slo_rules_topic");
|
||||
kept_values_per_metric = Integer.parseInt(prop.getProperty("stored_values_per_metric", "5"));
|
||||
self_publish_rule_file = Boolean.parseBoolean(prop.getProperty("self_publish_rule_file"));
|
||||
//TODO remove from docs as well: self_publish_rule_file = Boolean.parseBoolean(prop.getProperty("self_publish_rule_file"));
|
||||
single_slo_rule_active = Boolean.parseBoolean(prop.getProperty("single_slo_rule_active"));
|
||||
time_horizon_seconds = Integer.parseInt(prop.getProperty("time_horizon_seconds"));
|
||||
|
||||
@ -76,21 +71,27 @@ public class Main {
|
||||
maximum_acceptable_forward_predictions = Integer.parseInt(prop.getProperty("maximum_acceptable_forward_predictions"));
|
||||
|
||||
broker_ip = prop.getProperty("broker_ip_url");
|
||||
broker_port = Integer.parseInt(prop.getProperty("broker_port"));
|
||||
broker_username = prop.getProperty("broker_username");
|
||||
broker_password = prop.getProperty("broker_password");
|
||||
unbounded_metric_strings = new ArrayList<>(Arrays.asList(prop.getProperty("metrics_bounds").split(",")));
|
||||
|
||||
//TODO Delete below two lines
|
||||
//DetectorSubcomponent detector = new DetectorSubcomponent(default_application_name,detached);
|
||||
//detectors.put(default_application_name,detector);
|
||||
|
||||
|
||||
//director_subscription_topics = get_director_subscription_topics();
|
||||
DetectorSubcomponent detector = new DetectorSubcomponent(default_application_name,detached);
|
||||
detectors.put(default_application_name,detector);
|
||||
ArrayList<String> unbounded_metric_strings = new ArrayList<>(Arrays.asList(prop.getProperty("metrics_bounds").split(",")));
|
||||
for (String metric_string : unbounded_metric_strings) {
|
||||
detector.getSubcomponent_state().getMonitoring_attributes_bounds_representation().put(metric_string.split(";")[0], metric_string.split(";", 2)[1]); //TODO delete once this information is successfully received from the AMQP broker
|
||||
}
|
||||
} //initialization
|
||||
if (operational_mode.equals(OperationalMode.DETECTOR)) {
|
||||
Logger.getGlobal().log(INFO,"Started new Detector instance"); //This detector instance has been already started in the initialization block above as it will be commonly needed both for the plain Detector and the Director-Detector
|
||||
if (args.length>2){
|
||||
DetectorSubcomponent detector = new DetectorSubcomponent(args[args.length - 1],detached);
|
||||
detectors.put(args[args.length-1],detector);
|
||||
}else{
|
||||
Logger.getGlobal().log(severe_logging_level,"Error, wanted to start the component as a detector but the application name was not provided");
|
||||
}
|
||||
Logger.getGlobal().log(INFO,"Started new Detector instance");
|
||||
}else if (operational_mode.equals(OperationalMode.DIRECTOR)){
|
||||
Logger.getGlobal().log(INFO,"Starting new Director along the new Detector instance");
|
||||
Logger.getGlobal().log(INFO,"Starting new Director instance");
|
||||
DirectorSubcomponent director = new DirectorSubcomponent();
|
||||
SpringApplication.run(Main.class, args);
|
||||
Logger.getGlobal().log(INFO,"Execution completed");
|
||||
|
@ -15,8 +15,8 @@ import org.json.simple.parser.ParseException;
|
||||
import slo_violation_detector_engine.detector.DetectorSubcomponent;
|
||||
import utilities.MathUtils;
|
||||
import utilities.SLOViolationCalculator;
|
||||
import utility_beans.RealtimeMonitoringAttribute;
|
||||
import utility_beans.PredictedMonitoringAttribute;
|
||||
import utility_beans.monitoring.RealtimeMonitoringAttribute;
|
||||
import utility_beans.monitoring.PredictedMonitoringAttribute;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
@ -28,7 +28,7 @@ import java.util.stream.Collectors;
|
||||
|
||||
import static configuration.Constants.*;
|
||||
import static slo_rule_modelling.SLOSubRule.find_rule_type;
|
||||
import static utility_beans.PredictedMonitoringAttribute.getPredicted_monitoring_attributes;
|
||||
import static utility_beans.monitoring.PredictedMonitoringAttribute.getPredicted_monitoring_attributes;
|
||||
|
||||
public class SLORule {
|
||||
private DetectorSubcomponent associated_detector;
|
||||
@ -37,6 +37,7 @@ public class SLORule {
|
||||
private ArrayList<String> monitoring_attributes = new ArrayList<>();
|
||||
private JSONObject rule_representation;
|
||||
private SLOFormatVersion rule_format;
|
||||
private String associated_application_name;
|
||||
|
||||
public DetectorSubcomponent getAssociated_detector() {
|
||||
return associated_detector;
|
||||
@ -59,6 +60,7 @@ public class SLORule {
|
||||
this.rule_format = find_rule_format(this.rule_representation);
|
||||
this.slo_subrules = parse_subrules(detector,this.rule_representation,this.rule_format);
|
||||
this.associated_detector = detector;
|
||||
this.associated_application_name = detector.get_application_name();
|
||||
}
|
||||
public SLORule(String rule_representation, ArrayList<String> metric_list, DetectorSubcomponent associated_detector){
|
||||
for (String metric: metric_list) {
|
||||
@ -77,6 +79,7 @@ public class SLORule {
|
||||
this.rule_format = find_rule_format(this.rule_representation);
|
||||
this.slo_subrules = parse_subrules(associated_detector,this.rule_representation,this.rule_format);
|
||||
this.associated_detector = associated_detector;
|
||||
this.associated_application_name = associated_detector.get_application_name();
|
||||
}
|
||||
|
||||
private static SLOFormatVersion find_rule_format(JSONObject rule_representation) {
|
||||
@ -336,4 +339,12 @@ public class SLORule {
|
||||
public void setRule_format(SLOFormatVersion rule_format) {
|
||||
this.rule_format = rule_format;
|
||||
}
|
||||
|
||||
public String getAssociated_application_name() {
|
||||
return associated_application_name;
|
||||
}
|
||||
|
||||
public void setAssociated_application_name(String associated_application_name) {
|
||||
this.associated_application_name = associated_application_name;
|
||||
}
|
||||
}
|
||||
|
@ -9,7 +9,7 @@
|
||||
package slo_rule_modelling;
|
||||
|
||||
import slo_violation_detector_engine.detector.DetectorSubcomponent;
|
||||
import utility_beans.PredictedMonitoringAttribute;
|
||||
import utility_beans.monitoring.PredictedMonitoringAttribute;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
|
@ -3,7 +3,12 @@ package slo_violation_detector_engine.detector;
|
||||
import org.apache.commons.collections4.queue.CircularFifoQueue;
|
||||
import slo_violation_detector_engine.generic.Runnables;
|
||||
import slo_violation_detector_engine.generic.SLOViolationDetectorSubcomponent;
|
||||
import utility_beans.*;
|
||||
import utility_beans.broker_communication.BrokerSubscriptionDetails;
|
||||
import utility_beans.generic_component_functionality.CharacterizedThread;
|
||||
import utility_beans.monitoring.RealtimeMonitoringAttribute;
|
||||
import utility_beans.synchronization.SynchronizedBoolean;
|
||||
import utility_beans.synchronization.SynchronizedBooleanMap;
|
||||
import utility_beans.synchronization.SynchronizedInteger;
|
||||
|
||||
|
||||
import java.util.Collections;
|
||||
@ -14,8 +19,9 @@ import java.util.logging.Logger;
|
||||
|
||||
import static configuration.Constants.*;
|
||||
import static slo_violation_detector_engine.generic.ComponentState.prop;
|
||||
import static utility_beans.CharacterizedThread.CharacterizedThreadRunMode.attached;
|
||||
import static utility_beans.RealtimeMonitoringAttribute.aggregate_metric_values;
|
||||
import static slo_violation_detector_engine.generic.ComponentState.unbounded_metric_strings;
|
||||
import static utility_beans.generic_component_functionality.CharacterizedThread.CharacterizedThreadRunMode.attached;
|
||||
import static utility_beans.monitoring.RealtimeMonitoringAttribute.aggregate_metric_values;
|
||||
|
||||
|
||||
public class DetectorSubcomponent extends SLOViolationDetectorSubcomponent {
|
||||
@ -56,6 +62,9 @@ public class DetectorSubcomponent extends SLOViolationDetectorSubcomponent {
|
||||
handled_application_name = application_name;
|
||||
detector_name = "detector_"+application_name+"_"+current_detector_id;
|
||||
}
|
||||
for (String metric_string : unbounded_metric_strings) {
|
||||
subcomponent_state.getMonitoring_attributes_bounds_representation().put(metric_string.split(";")[0], metric_string.split(";", 2)[1]); //TODO delete once this information is successfully received from the AMQP broker
|
||||
}
|
||||
if (characterized_thread_run_mode.equals(attached)) {
|
||||
DetectorSubcomponentUtilities.run_slo_violation_detection_engine(this);
|
||||
}else/*detached mode*/{
|
||||
@ -145,7 +154,7 @@ public class DetectorSubcomponent extends SLOViolationDetectorSubcomponent {
|
||||
public static DetectorSubcomponent get_associated_detector(String application_name){
|
||||
DetectorSubcomponent associated_detector = detector_subcomponents.get(application_name);
|
||||
if (associated_detector==null){
|
||||
if (detector_subcomponents.size()==1 && detector_subcomponents.get(default_application_name)!=null){//This means only the initial 'default' application exists
|
||||
if (detector_subcomponents.size()==1 && detector_subcomponents.get(default_application_name)!=null){//This means only the initial 'default_application' application exists
|
||||
associated_detector = detector_subcomponents.get(default_application_name);
|
||||
associated_detector.set_name(application_name);
|
||||
}
|
||||
|
@ -2,15 +2,13 @@ package slo_violation_detector_engine.detector;
|
||||
|
||||
import org.apache.commons.collections4.queue.CircularFifoQueue;
|
||||
import slo_rule_modelling.SLORule;
|
||||
import utility_beans.MonitoringAttributeStatistics;
|
||||
import utility_beans.RealtimeMonitoringAttribute;
|
||||
import utility_beans.monitoring.MonitoringAttributeStatistics;
|
||||
import utility_beans.monitoring.RealtimeMonitoringAttribute;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
|
||||
import static configuration.Constants.kept_values_per_metric;
|
||||
|
||||
public class DetectorSubcomponentState{
|
||||
private HashMap<String, MonitoringAttributeStatistics> monitoring_attributes_statistics = new HashMap<>();
|
||||
private HashMap<String, MonitoringAttributeStatistics> monitoring_attributes_roc_statistics = new HashMap<>();
|
||||
|
@ -9,15 +9,9 @@ import slo_rule_modelling.SLORule;
|
||||
import slo_rule_modelling.SLOSubRule;
|
||||
import utilities.DebugDataSubscription;
|
||||
import utilities.MonitoringAttributeUtilities;
|
||||
import utility_beans.BrokerPublisher;
|
||||
import utility_beans.BrokerSubscriber;
|
||||
import utility_beans.CharacterizedThread;
|
||||
import utility_beans.SynchronizedBoolean;
|
||||
import utility_beans.generic_component_functionality.CharacterizedThread;
|
||||
import utility_beans.synchronization.SynchronizedBoolean;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.*;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
@ -28,8 +22,7 @@ import static slo_violation_detector_engine.director.DirectorSubcomponent.MESSAG
|
||||
import static slo_violation_detector_engine.generic.ComponentState.prop;
|
||||
import static slo_violation_detector_engine.generic.Runnables.get_severity_calculation_runnable;
|
||||
import static runtime.Main.*;
|
||||
import static slo_violation_detector_engine.generic.SLOViolationDetectorStateUtils.*;
|
||||
import static utility_beans.PredictedMonitoringAttribute.getPredicted_monitoring_attributes;
|
||||
import static utility_beans.monitoring.PredictedMonitoringAttribute.getPredicted_monitoring_attributes;
|
||||
|
||||
public class DetectorSubcomponentUtilities {
|
||||
|
||||
@ -117,10 +110,10 @@ public class DetectorSubcomponentUtilities {
|
||||
return null;
|
||||
}
|
||||
|
||||
public static ArrayList<AttributeSubscription> initialize_attribute_subscribers(ArrayList<SLORule> rules_list, String broker_ip_address, String broker_username, String broker_password){
|
||||
public static ArrayList<AttributeSubscription> initialize_attribute_subscribers(ArrayList<SLORule> rules_list, String broker_ip_address,int broker_port, String broker_username, String broker_password){
|
||||
ArrayList<AttributeSubscription> attribute_subscribers = new ArrayList<>();
|
||||
for (SLORule rule:rules_list){
|
||||
attribute_subscribers.add(new AttributeSubscription(rule,broker_ip_address,broker_username,broker_password));
|
||||
attribute_subscribers.add(new AttributeSubscription(rule,broker_ip_address,broker_port,broker_username,broker_password));
|
||||
}
|
||||
return attribute_subscribers;
|
||||
}
|
||||
@ -298,7 +291,7 @@ public class DetectorSubcomponentUtilities {
|
||||
initialize_monitoring_datastructures_with_empty_data(associated_detector_subcomponent.getSubcomponent_state().slo_rules);
|
||||
//
|
||||
/*associated_detector_subcomponent.getUtilities().*/initialize_subrule_and_attribute_associations(associated_detector_subcomponent.getSubcomponent_state().slo_rules,associated_detector_subcomponent.can_modify_slo_rules);
|
||||
initialize_attribute_subscribers(associated_detector_subcomponent.getSubcomponent_state().slo_rules, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password"));
|
||||
initialize_attribute_subscribers(associated_detector_subcomponent.getSubcomponent_state().slo_rules, prop.getProperty("broker_ip_url"), Integer.parseInt(prop.getProperty("broker_port")), prop.getProperty("broker_username"), prop.getProperty("broker_password"));
|
||||
initialize_slo_processing(associated_detector_subcomponent.getSubcomponent_state().slo_rules);
|
||||
|
||||
}
|
||||
|
@ -4,16 +4,20 @@ import eu.nebulouscloud.exn.Connector;
|
||||
import org.json.simple.JSONArray;
|
||||
import org.json.simple.JSONObject;
|
||||
import org.json.simple.parser.JSONParser;
|
||||
import org.json.simple.parser.ParseException;
|
||||
import slo_violation_detector_engine.detector.DetectorSubcomponent;
|
||||
import slo_violation_detector_engine.generic.SLOViolationDetectorSubcomponent;
|
||||
import utility_beans.*;
|
||||
import utility_beans.broker_communication.BrokerPublisher;
|
||||
import utility_beans.broker_communication.BrokerSubscriber;
|
||||
import utility_beans.broker_communication.BrokerSubscriptionDetails;
|
||||
import utility_beans.generic_component_functionality.CharacterizedThread;
|
||||
import utility_beans.monitoring.RealtimeMonitoringAttribute;
|
||||
import utility_beans.synchronization.SynchronizedBoolean;
|
||||
import utility_beans.synchronization.SynchronizedStringMap;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
import java.text.NumberFormat;
|
||||
import java.time.Clock;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.BiFunction;
|
||||
@ -64,11 +68,11 @@ public class DirectorSubcomponent extends SLOViolationDetectorSubcomponent {
|
||||
//3. Creation of the lost device subscriber thread, which listens for a new event signalling a lost edge device
|
||||
|
||||
//Metric list subscription thread
|
||||
BrokerSubscriber metric_list_subscriber = new BrokerSubscriber(metric_list_topic, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password"), amq_library_configuration_location,nebulous_components_application);
|
||||
BrokerSubscriber metric_list_subscriber = new BrokerSubscriber(metric_list_topic, prop.getProperty("broker_ip_url"), Integer.parseInt(prop.getProperty("broker_port")),prop.getProperty("broker_username"), prop.getProperty("broker_password"), amq_library_configuration_location,EMPTY);
|
||||
Runnable metric_list_topic_subscriber_runnable = () -> {
|
||||
boolean did_not_finish_execution_gracefully = true;
|
||||
while (did_not_finish_execution_gracefully) {
|
||||
int exit_status = metric_list_subscriber.subscribe(metric_list_subscriber_function, this.stop_signal); //This subscriber should not be immune to stop signals
|
||||
int exit_status = metric_list_subscriber.subscribe(metric_list_subscriber_function, EMPTY,this.stop_signal); //This subscriber should not be immune to stop signals
|
||||
if (exit_status!=0) {
|
||||
Logger.getGlobal().log(info_logging_level,"Broker unavailable, will try to reconnect after 10 seconds");
|
||||
try {
|
||||
@ -87,11 +91,11 @@ public class DirectorSubcomponent extends SLOViolationDetectorSubcomponent {
|
||||
|
||||
|
||||
//SLO rule subscription thread
|
||||
BrokerSubscriber slo_rule_topic_subscriber = new BrokerSubscriber(slo_rules_topic, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password"), amq_library_configuration_location,nebulous_components_application);
|
||||
BrokerSubscriber slo_rule_topic_subscriber = new BrokerSubscriber(slo_rules_topic, prop.getProperty("broker_ip_url"), Integer.parseInt(prop.getProperty("broker_port")),prop.getProperty("broker_username"), prop.getProperty("broker_password"), amq_library_configuration_location,EMPTY);
|
||||
Runnable slo_rules_topic_subscriber_runnable = () -> {
|
||||
boolean did_not_finish_execution_gracefully = true;
|
||||
while (did_not_finish_execution_gracefully) {
|
||||
int exit_status = slo_rule_topic_subscriber.subscribe(slo_rule_topic_subscriber_function, stop_signal); //This subscriber should not be immune to stop signals
|
||||
int exit_status = slo_rule_topic_subscriber.subscribe(slo_rule_topic_subscriber_function, EMPTY,stop_signal); //This subscriber should not be immune to stop signals
|
||||
if (exit_status!=0) {
|
||||
Logger.getGlobal().log(info_logging_level, "Broker unavailable, will try to reconnect after 10 seconds");
|
||||
try {
|
||||
@ -111,9 +115,9 @@ public class DirectorSubcomponent extends SLOViolationDetectorSubcomponent {
|
||||
|
||||
|
||||
|
||||
BrokerSubscriber device_lost_subscriber = new BrokerSubscriber(topic_for_lost_device_announcement, broker_ip, broker_username, broker_password, amq_library_configuration_location,nebulous_components_application);
|
||||
BrokerSubscriber device_lost_subscriber = new BrokerSubscriber(topic_for_lost_device_announcement, broker_ip, broker_port, broker_username, broker_password, amq_library_configuration_location,EMPTY);
|
||||
BiFunction<BrokerSubscriptionDetails, String, String> device_lost_subscriber_function = (broker_details, message) -> {
|
||||
BrokerPublisher persistent_publisher = new BrokerPublisher(topic_for_severity_announcement, broker_ip, broker_username, broker_password, amq_library_configuration_location);
|
||||
BrokerPublisher persistent_publisher = new BrokerPublisher(topic_for_severity_announcement, broker_ip, broker_port, broker_username, broker_password, amq_library_configuration_location);
|
||||
|
||||
Clock clock = Clock.systemUTC();
|
||||
Long current_time_seconds = (long) Math.floor(clock.millis()/1000.0);
|
||||
@ -121,7 +125,7 @@ public class DirectorSubcomponent extends SLOViolationDetectorSubcomponent {
|
||||
severity_json.put("severity", 100);
|
||||
severity_json.put("probability", 100);
|
||||
severity_json.put("predictionTime", current_time_seconds);
|
||||
persistent_publisher.publish(severity_json.toJSONString());
|
||||
persistent_publisher.publish(severity_json.toJSONString(), Collections.singleton(EMPTY));
|
||||
|
||||
return topic_for_lost_device_announcement + ":MSG:" + message;
|
||||
};
|
||||
@ -132,7 +136,7 @@ public class DirectorSubcomponent extends SLOViolationDetectorSubcomponent {
|
||||
Runnable device_lost_topic_subscriber_runnable = () -> {
|
||||
boolean did_not_finish_execution_gracefully = true;
|
||||
while (did_not_finish_execution_gracefully) {
|
||||
int exit_status = device_lost_subscriber.subscribe(device_lost_subscriber_function, stop_signal); //This subscriber should not be immune to stop signals, else there would be new AtomicBoolean(false)
|
||||
int exit_status = device_lost_subscriber.subscribe(device_lost_subscriber_function, EMPTY,stop_signal); //This subscriber should not be immune to stop signals, else there would be new AtomicBoolean(false)
|
||||
if (exit_status!=0) {
|
||||
Logger.getGlobal().log(info_logging_level, "A device used by the platform was lost, will therefore trigger a reconfiguration");
|
||||
try {
|
||||
@ -150,18 +154,6 @@ public class DirectorSubcomponent extends SLOViolationDetectorSubcomponent {
|
||||
CharacterizedThread.create_new_thread(device_lost_topic_subscriber_runnable,"device_lost_topic_subscriber_thread",true,this, CharacterizedThread.CharacterizedThreadType.persistent_running_director_thread);
|
||||
|
||||
|
||||
if (self_publish_rule_file) {
|
||||
String json_file_name = prop.getProperty("input_file");
|
||||
String rules_json_string = null;
|
||||
try {
|
||||
rules_json_string = String.join(EMPTY, Files.readAllLines(Paths.get(new File(json_file_name).getAbsolutePath())));
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
BrokerPublisher publisher = new BrokerPublisher(slo_rules_topic, prop.getProperty("broker_ip_url"), prop.getProperty("broker_username"), prop.getProperty("broker_password"), amq_library_configuration_location);
|
||||
publisher.publish(rules_json_string);
|
||||
Logger.getGlobal().log(info_logging_level, "Sent message\n" + rules_json_string);
|
||||
}
|
||||
}
|
||||
first_run = false;
|
||||
|
||||
@ -248,7 +240,17 @@ public class DirectorSubcomponent extends SLOViolationDetectorSubcomponent {
|
||||
|
||||
public static BiFunction<BrokerSubscriptionDetails, String, String> slo_rule_topic_subscriber_function = (broker_details, message) -> {
|
||||
//DetectorSubcomponent new_detector = new DetectorSubcomponent(application, CharacterizedThread.CharacterizedThreadRunMode.detached);
|
||||
String application = broker_details.getApplication_name();
|
||||
JSONParser parser = new JSONParser();
|
||||
String application;
|
||||
try{
|
||||
JSONObject object = (JSONObject) parser.parse(message);
|
||||
application = (String) object.get("name");
|
||||
Logger.getGlobal().log(info_logging_level,"Parsed a new slo rule for application "+application);
|
||||
}catch (ParseException e) {
|
||||
Logger.getGlobal().log(severe_logging_level,"Could not understand the slo rule which was received, skipping...");
|
||||
return EMPTY;
|
||||
}
|
||||
|
||||
DetectorSubcomponent new_detector = get_associated_detector(application);
|
||||
detector_subcomponents.put(application,new_detector);
|
||||
synchronized (new_detector.can_modify_slo_rules) {
|
||||
|
@ -1,11 +1,15 @@
|
||||
package slo_violation_detector_engine.generic;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Properties;
|
||||
|
||||
public class ComponentState {
|
||||
|
||||
public static Properties prop = new Properties();
|
||||
public static String broker_ip ="localhost";
|
||||
public static int broker_port = 5672;
|
||||
public static String broker_username= "admin";
|
||||
public static String broker_password= "admin";
|
||||
public static ArrayList<String> unbounded_metric_strings;
|
||||
}
|
||||
|
@ -3,12 +3,12 @@ package slo_violation_detector_engine.generic;
|
||||
//import eu.melodic.event.brokerclient.BrokerPublisher;
|
||||
import slo_violation_detector_engine.detector.DetectorSubcomponent;
|
||||
import slo_violation_detector_engine.detector.DetectorSubcomponentUtilities;
|
||||
import utility_beans.BrokerPublisher;
|
||||
import utility_beans.broker_communication.BrokerPublisher;
|
||||
import org.json.simple.JSONObject;
|
||||
import slo_rule_modelling.SLORule;
|
||||
import utility_beans.BrokerSubscriber;
|
||||
import utility_beans.BrokerSubscriptionDetails;
|
||||
import utility_beans.CharacterizedThread;
|
||||
import utility_beans.broker_communication.BrokerSubscriber;
|
||||
import utility_beans.broker_communication.BrokerSubscriptionDetails;
|
||||
import utility_beans.generic_component_functionality.CharacterizedThread;
|
||||
|
||||
import java.sql.Timestamp;
|
||||
import java.time.Clock;
|
||||
@ -38,8 +38,8 @@ public class Runnables {
|
||||
synchronized (detector.HAS_MESSAGE_ARRIVED.get_synchronized_boolean(debug_data_trigger_topic_name)) {
|
||||
//if (Main.HAS_MESSAGE_ARRIVED.get_synchronized_boolean(debug_data_topic_name).getValue())
|
||||
BrokerSubscriptionDetails broker_details = detector.getBrokerSubscriptionDetails(debug_data_trigger_topic_name);
|
||||
debug_data_subscriber = new BrokerSubscriber(debug_data_trigger_topic_name, broker_details.getBroker_ip(),broker_details.getBroker_username(),broker_details.getBroker_password(), amq_library_configuration_location,detector.get_application_name());
|
||||
debug_data_subscriber.subscribe(debug_data_generation, detector.stop_signal);
|
||||
debug_data_subscriber = new BrokerSubscriber(debug_data_trigger_topic_name, broker_details.getBroker_ip(),broker_details.getBroker_port(),broker_details.getBroker_username(),broker_details.getBroker_password(), amq_library_configuration_location,detector.get_application_name());
|
||||
debug_data_subscriber.subscribe(debug_data_generation, EMPTY,detector.stop_signal);
|
||||
Logger.getGlobal().log(info_logging_level,"Debug data subscriber initiated");
|
||||
}
|
||||
if (Thread.interrupted()) {
|
||||
@ -72,7 +72,7 @@ public class Runnables {
|
||||
public static Runnable get_severity_calculation_runnable(SLORule rule, DetectorSubcomponent detector) {
|
||||
|
||||
Runnable severity_calculation_runnable = () -> {
|
||||
BrokerPublisher persistent_publisher = new BrokerPublisher(topic_for_severity_announcement, broker_ip,broker_username,broker_password, amq_library_configuration_location);
|
||||
BrokerPublisher persistent_publisher = new BrokerPublisher(topic_for_severity_announcement, broker_ip,broker_port,broker_username,broker_password, amq_library_configuration_location);
|
||||
|
||||
while (!detector.stop_signal.get()) {
|
||||
synchronized (detector.PREDICTION_EXISTS) {
|
||||
|
@ -1,13 +1,13 @@
|
||||
package slo_violation_detector_engine.generic;
|
||||
|
||||
import utility_beans.*;
|
||||
import utility_beans.generic_component_functionality.OperationalMode;
|
||||
import utility_beans.synchronization.SynchronizedInteger;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
import java.util.Properties;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import static configuration.Constants.*;
|
||||
|
@ -1,6 +1,6 @@
|
||||
package slo_violation_detector_engine.generic;
|
||||
|
||||
import utility_beans.CharacterizedThread;
|
||||
import utility_beans.generic_component_functionality.CharacterizedThread;
|
||||
|
||||
public abstract class SLOViolationDetectorSubcomponent {
|
||||
public CharacterizedThread.CharacterizedThreadType thread_type;
|
||||
|
@ -4,11 +4,16 @@ package utilities;
|
||||
//import eu.melodic.event.brokerclient.BrokerSubscriber;
|
||||
import slo_violation_detector_engine.generic.Runnables;
|
||||
import slo_violation_detector_engine.detector.DetectorSubcomponent;
|
||||
import utility_beans.*;
|
||||
import org.apache.commons.collections4.queue.CircularFifoQueue;
|
||||
import slo_rule_modelling.SLOSubRule;
|
||||
import utility_beans.broker_communication.BrokerPublisher;
|
||||
import utility_beans.broker_communication.BrokerSubscriber;
|
||||
import utility_beans.broker_communication.BrokerSubscriptionDetails;
|
||||
import utility_beans.generic_component_functionality.CharacterizedThread;
|
||||
import utility_beans.monitoring.RealtimeMonitoringAttribute;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.logging.Logger;
|
||||
@ -35,6 +40,7 @@ public class DebugDataSubscription {
|
||||
intermediate_debug_string.append("--------------------------:\n\t- ");
|
||||
for (String director_id : director_subcomponents.keySet()) {
|
||||
intermediate_debug_string.append("Details for threads belonging to director "+director_id+":\n");
|
||||
intermediate_debug_string.append("--------------------------:\n\t- ");
|
||||
for (String persistent_director_thread_name : director_subcomponents.get(director_id).persistent_running_director_threads.keySet()){
|
||||
if (flag_first_element_is_to_be_iterated) {
|
||||
intermediate_debug_string.append(persistent_director_thread_name);
|
||||
@ -59,6 +65,7 @@ public class DebugDataSubscription {
|
||||
intermediate_debug_string.append(",\n\t- ").append(s);
|
||||
}
|
||||
}
|
||||
intermediate_debug_string.append("\n");
|
||||
}
|
||||
intermediate_debug_string.append("\n\n");
|
||||
flag_first_element_is_to_be_iterated = true;
|
||||
@ -68,12 +75,13 @@ public class DebugDataSubscription {
|
||||
intermediate_debug_string.append("Details for slo-bound threads belonging to detector ").append(detector_id).append(":\n");
|
||||
for (String s : detector_subcomponents.get(detector_id).getSubcomponent_state().slo_bound_running_threads.keySet()) {
|
||||
if (flag_first_element_is_to_be_iterated) {
|
||||
intermediate_debug_string.append(s);
|
||||
intermediate_debug_string.append("\n\t").append(s);
|
||||
flag_first_element_is_to_be_iterated = false;
|
||||
} else {
|
||||
intermediate_debug_string.append(",\n\t- ").append(s);
|
||||
}
|
||||
}
|
||||
intermediate_debug_string.append("\n");
|
||||
}
|
||||
intermediate_debug_string.append("\n\n");
|
||||
output_debug_data = output_debug_data+intermediate_debug_string;
|
||||
@ -120,8 +128,9 @@ public class DebugDataSubscription {
|
||||
|
||||
for (String detector_id : detector_subcomponents.keySet()) {
|
||||
intermediate_debug_string.append("Details for threads belonging to detector ").append(detector_id).append(":\n");
|
||||
intermediate_debug_string.append("--------------------------:");
|
||||
intermediate_debug_string.append("\nShowing the adaptation times that pend processing:\n").append(detector_subcomponents.get(detector_id).getSubcomponent_state().adaptation_times_pending_processing);
|
||||
intermediate_debug_string.append("\nThese are the timestamps of the latest adaptation events\n").append(detector_subcomponents.get(detector_id).getSubcomponent_state().slo_violation_event_recording_queue);
|
||||
intermediate_debug_string.append("\nThese are the timestamps of the latest adaptation events\n").append(detector_subcomponents.get(detector_id).getSubcomponent_state().slo_violation_event_recording_queue).append("\n");
|
||||
}
|
||||
output_debug_data = output_debug_data+intermediate_debug_string;
|
||||
|
||||
@ -132,8 +141,8 @@ public class DebugDataSubscription {
|
||||
!broker_subscription_details.getBroker_password().equals(EMPTY)&&
|
||||
!broker_subscription_details.getBroker_username().equals(EMPTY)){
|
||||
|
||||
BrokerPublisher publisher = new BrokerPublisher(debug_data_output_topic_name, broker_subscription_details.getBroker_ip(),broker_subscription_details.getBroker_username(),broker_subscription_details.getBroker_password(), amq_library_configuration_location);
|
||||
publisher.publish(output_debug_data);
|
||||
BrokerPublisher publisher = new BrokerPublisher(debug_data_output_topic_name, broker_subscription_details.getBroker_ip(),broker_subscription_details.getBroker_port(),broker_subscription_details.getBroker_username(),broker_subscription_details.getBroker_password(), amq_library_configuration_location);
|
||||
publisher.publish(output_debug_data, Collections.singleton(EMPTY));
|
||||
}
|
||||
|
||||
return output_debug_data;
|
||||
|
@ -10,14 +10,14 @@ package utilities;
|
||||
|
||||
import slo_rule_modelling.SLOSubRule;
|
||||
import slo_violation_detector_engine.detector.DetectorSubcomponentState;
|
||||
import utility_beans.MonitoringAttributeStatistics;
|
||||
import utility_beans.RealtimeMonitoringAttribute;
|
||||
import utility_beans.monitoring.MonitoringAttributeStatistics;
|
||||
import utility_beans.monitoring.RealtimeMonitoringAttribute;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
||||
import static configuration.Constants.epsilon;
|
||||
import static configuration.Constants.roc_limit;
|
||||
import static utility_beans.PredictedMonitoringAttribute.*;
|
||||
import static utility_beans.monitoring.PredictedMonitoringAttribute.*;
|
||||
|
||||
public class MonitoringAttributeUtilities {
|
||||
|
||||
|
@ -1,6 +1,6 @@
|
||||
package utilities;
|
||||
|
||||
import utility_beans.OperationalMode;
|
||||
import utility_beans.generic_component_functionality.OperationalMode;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
@ -9,7 +9,7 @@
|
||||
package utilities;
|
||||
|
||||
import slo_rule_modelling.SLOSubRule;
|
||||
import utility_beans.PredictedMonitoringAttribute;
|
||||
import utility_beans.monitoring.PredictedMonitoringAttribute;
|
||||
|
||||
import java.util.logging.Logger;
|
||||
|
||||
|
@ -1,282 +0,0 @@
|
||||
package utility_beans;
|
||||
|
||||
import javax.swing.*;
|
||||
import eu.nebulouscloud.exn.Connector;
|
||||
import eu.nebulouscloud.exn.core.Publisher;
|
||||
import eu.nebulouscloud.exn.handlers.ConnectorHandler;
|
||||
import eu.nebulouscloud.exn.settings.StaticExnConfig;
|
||||
import org.json.simple.JSONObject;
|
||||
import org.json.simple.parser.JSONParser;
|
||||
import org.json.simple.parser.ParseException;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import static configuration.Constants.*;
|
||||
import static utilities.DebugDataSubscription.debug_data_trigger_topic_name;
|
||||
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class CustomDataPublisher {
|
||||
private static final Map<String, String> presetTexts = new HashMap<>();
|
||||
|
||||
static {
|
||||
update_event_data();
|
||||
}
|
||||
|
||||
private static void update_event_data(){
|
||||
presetTexts.put("eu.nebulouscloud.monitoring.slo.new", "{\n" +
|
||||
" \"name\": \"_Application1\",\n" +
|
||||
" \"operator\": \"OR\",\n" +
|
||||
" \"constraints\": [\n" +
|
||||
" {\n" +
|
||||
" \"name\": \"cpu_and_memory_or_swap_too_high\",\n" +
|
||||
" \"operator\": \"AND\",\n" +
|
||||
" \"constraints\": [\n" +
|
||||
" {\n" +
|
||||
" \"name\": \"cpu_usage_high\",\n" +
|
||||
" \"metric\": \"cpu_usage\",\n" +
|
||||
" \"operator\": \">\",\n" +
|
||||
" \"threshold\": 80.0\n" +
|
||||
" },\n" +
|
||||
" {\n" +
|
||||
" \"name\": \"memory_or_swap_usage_high\",\n" +
|
||||
" \"operator\": \"OR\",\n" +
|
||||
" \"constraints\": [\n" +
|
||||
" {\n" +
|
||||
" \"name\": \"memory_usage_high\",\n" +
|
||||
" \"metric\": \"ram_usage\",\n" +
|
||||
" \"operator\": \">\",\n" +
|
||||
" \"threshold\": 70.0\n" +
|
||||
" },\n" +
|
||||
" {\n" +
|
||||
" \"name\": \"disk_usage_high\",\n" +
|
||||
" \"metric\": \"swap_usage\",\n" +
|
||||
" \"operator\": \">\",\n" +
|
||||
" \"threshold\": 50.0\n" +
|
||||
" }\n" +
|
||||
" ]\n" +
|
||||
" }\n" +
|
||||
" ]\n" +
|
||||
" }\n" +
|
||||
" ]\n" +
|
||||
"}");
|
||||
presetTexts.put("eu.nebulouscloud.monitoring.realtime.cpu_usage",
|
||||
"{\n" +
|
||||
" \"metricValue\": 12.34,\n" +
|
||||
" \"level\": 1,\n" +
|
||||
" \"component_id\":\"postgresql_1\",\n" +
|
||||
" \"timestamp\": "+(int)(System.currentTimeMillis()/1000)+"\n" +
|
||||
"}\n");
|
||||
presetTexts.put("eu.nebulouscloud.monitoring.predicted.cpu_usage", "{\n" +
|
||||
" \"metricValue\": 92.34,\n" +
|
||||
" \"level\": 1,\n" +
|
||||
" \"timestamp\": "+(int)(System.currentTimeMillis()/1000)+"\n" +
|
||||
" \"probability\": 0.98,\n" +
|
||||
" \"confidence_interval\" : [8,15]\n" +
|
||||
" \"predictionTime\": "+(int)(10+System.currentTimeMillis()/1000)+"\n" +
|
||||
"}");
|
||||
}
|
||||
private Publisher private_publisher_instance;
|
||||
private String topic;
|
||||
private String broker_ip;
|
||||
|
||||
public CustomDataPublisher(String broker_topic, String broker_ip, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation,String publisher_key) {
|
||||
|
||||
boolean publisher_configuration_changed;
|
||||
ArrayList<Publisher> publishers = new ArrayList<>();
|
||||
private_publisher_instance = new Publisher(slovid_publisher_key,broker_topic,true,true);
|
||||
publishers.add(private_publisher_instance);
|
||||
|
||||
|
||||
Connector connector = new Connector("slovid",
|
||||
new ConnectorHandler() {
|
||||
}, publishers
|
||||
, List.of(),
|
||||
false,
|
||||
false,
|
||||
new StaticExnConfig(
|
||||
broker_ip,
|
||||
5672,
|
||||
brokerUsername,
|
||||
brokerPassword,
|
||||
60,
|
||||
EMPTY
|
||||
)
|
||||
);
|
||||
connector.start();
|
||||
|
||||
}
|
||||
|
||||
|
||||
public CustomDataPublisher(String broker_topic, String broker_ip, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation) {
|
||||
this(broker_topic,broker_ip,brokerUsername,brokerPassword,amqLibraryConfigurationLocation,slovid_publisher_key);
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
JFrame frame = new JFrame("Broker input app");
|
||||
JTextField broker_ipTextField = new JTextField("localhost", 10);
|
||||
JComboBox<String> smallTextField = new JComboBox<>(new String[]{"eu.nebulouscloud.monitoring.slo.new","eu.nebulouscloud.monitoring.realtime.cpu_usage", "eu.nebulouscloud.monitoring.predicted.cpu_usage", debug_data_trigger_topic_name});
|
||||
smallTextField.setEditable(true);
|
||||
JTextField othersmallTextField = new JTextField("slovid", 10);
|
||||
JTextArea largeTextArea = new JTextArea(10, 25);
|
||||
JButton submitButton = new JButton("Send");
|
||||
|
||||
AtomicReference<String> broker_ip = new AtomicReference<>();
|
||||
AtomicReference<String> broker_topic = new AtomicReference<>();
|
||||
AtomicReference<String> message_payload = new AtomicReference<>();
|
||||
AtomicReference<String> publisher_key = new AtomicReference<>();
|
||||
|
||||
smallTextField.addActionListener(e -> {
|
||||
update_event_data();
|
||||
String selectedOption = (String) smallTextField.getSelectedItem();
|
||||
String presetText = presetTexts.getOrDefault(selectedOption, "");
|
||||
largeTextArea.setText(presetText);
|
||||
});
|
||||
|
||||
submitButton.addActionListener(e -> {
|
||||
broker_ip.set(broker_ipTextField.getText());
|
||||
broker_topic.set(smallTextField.getSelectedItem().toString());
|
||||
message_payload.set(largeTextArea.getText());
|
||||
publisher_key.set(othersmallTextField.getText());
|
||||
CustomDataPublisher publisher = new CustomDataPublisher(broker_topic.toString(), broker_ip.toString(), "admin", "admin", EMPTY, publisher_key.toString());
|
||||
publisher.publish(message_payload.toString());
|
||||
});
|
||||
|
||||
JPanel panel = new JPanel();
|
||||
panel.add(new JLabel("Broker to publish to:"));
|
||||
panel.add(broker_ipTextField);
|
||||
panel.add(new JLabel("Topic to publish to:"));
|
||||
panel.add(smallTextField);
|
||||
panel.add(new JLabel("Key to publish with:"));
|
||||
panel.add(othersmallTextField);
|
||||
panel.add(new JLabel("Text to publish:"));
|
||||
panel.add(new JScrollPane(largeTextArea));
|
||||
panel.add(submitButton);
|
||||
|
||||
frame.add(panel);
|
||||
frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
|
||||
frame.pack();
|
||||
frame.setVisible(true);
|
||||
}
|
||||
|
||||
//TODO This assumes that the only content to be sent is json-like
|
||||
public void publish(String json_string_content) {
|
||||
JSONParser parser = new JSONParser();
|
||||
JSONObject json_object = new JSONObject();
|
||||
try{
|
||||
json_object = (JSONObject) parser.parse(json_string_content);
|
||||
}catch (ParseException p){
|
||||
Logger.getGlobal().log(Level.SEVERE,"Could not parse the string content");
|
||||
}
|
||||
private_publisher_instance.send(json_object);
|
||||
}
|
||||
|
||||
public void publish(JSONObject json_object) {
|
||||
private_publisher_instance.send(json_object);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
class OldCustomDataPublisher {
|
||||
private static HashMap<String, HashSet<String>> broker_and_topics_to_publish_to = new HashMap<>();
|
||||
private Publisher private_publisher_instance;
|
||||
private String topic;
|
||||
private String broker_ip;
|
||||
public OldCustomDataPublisher(String broker_topic, String broker_ip, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation,String publisher_key) {
|
||||
|
||||
boolean publisher_configuration_changed;
|
||||
ArrayList<Publisher> publishers = new ArrayList<>();
|
||||
private_publisher_instance = new Publisher(slovid_publisher_key,broker_topic,true,true);
|
||||
publishers.add(private_publisher_instance);
|
||||
|
||||
|
||||
Connector connector = new Connector("slovid",
|
||||
new ConnectorHandler() {
|
||||
}, publishers
|
||||
, List.of(),
|
||||
false,
|
||||
false,
|
||||
new StaticExnConfig(
|
||||
broker_ip,
|
||||
5672,
|
||||
brokerUsername,
|
||||
brokerPassword,
|
||||
60,
|
||||
EMPTY
|
||||
)
|
||||
);
|
||||
connector.start();
|
||||
|
||||
}
|
||||
|
||||
|
||||
public OldCustomDataPublisher(String broker_topic, String broker_ip, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation) {
|
||||
this(broker_topic,broker_ip,brokerUsername,brokerPassword,amqLibraryConfigurationLocation,slovid_publisher_key);
|
||||
}
|
||||
public static void main(String[] args){
|
||||
|
||||
//JSONObject msg = new JSONObject();
|
||||
//msg.put("key","value");
|
||||
|
||||
JFrame frame = new JFrame("Broker input app");
|
||||
JTextField broker_ipTextField = new JTextField("localhost",10);
|
||||
JTextField smallTextField = new JTextField("eu.nebulouscloud.monitoring.metric_list",30);
|
||||
JTextField othersmallTextField = new JTextField("slovid",20);
|
||||
JTextArea largeTextArea = new JTextArea(10, 30);
|
||||
JButton submitButton = new JButton("Send");
|
||||
|
||||
AtomicReference<String> broker_ip = new AtomicReference<>();
|
||||
AtomicReference<String> broker_topic = new AtomicReference<>();
|
||||
AtomicReference<String> message_payload = new AtomicReference<>();
|
||||
AtomicReference<String> publisher_key = new AtomicReference<>();
|
||||
|
||||
|
||||
submitButton.addActionListener(e -> {
|
||||
broker_ip.set(broker_ipTextField.getText());
|
||||
broker_topic.set(smallTextField.getText());
|
||||
message_payload.set(largeTextArea.getText());
|
||||
publisher_key.set(othersmallTextField.getText());
|
||||
OldCustomDataPublisher publisher = new OldCustomDataPublisher(broker_topic.toString(),broker_ip.toString(),"admin","admin",EMPTY,publisher_key.toString());
|
||||
publisher.publish(message_payload.toString());
|
||||
});
|
||||
|
||||
JPanel panel = new JPanel();
|
||||
panel.add(new JLabel("Broker to publish to:"));
|
||||
panel.add(broker_ipTextField);
|
||||
panel.add(new JLabel("Topic to publish to:"));
|
||||
panel.add(smallTextField);
|
||||
panel.add(new JLabel("Key to publish with:"));
|
||||
panel.add(othersmallTextField);
|
||||
panel.add(new JLabel("Text to publish:"));
|
||||
panel.add(new JScrollPane(largeTextArea));
|
||||
panel.add(submitButton);
|
||||
|
||||
frame.add(panel);
|
||||
frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
|
||||
frame.pack();
|
||||
frame.setVisible(true);
|
||||
|
||||
//publisher.publish(msg);
|
||||
}
|
||||
|
||||
//TODO This assumes that the only content to be sent is json-like
|
||||
public void publish(String json_string_content) {
|
||||
JSONParser parser = new JSONParser();
|
||||
JSONObject json_object = new JSONObject();
|
||||
try{
|
||||
json_object = (JSONObject) parser.parse(json_string_content);
|
||||
}catch (ParseException p){
|
||||
Logger.getGlobal().log(Level.SEVERE,"Could not parse the string content");
|
||||
}
|
||||
private_publisher_instance.send(json_object);
|
||||
}
|
||||
|
||||
public void publish(JSONObject json_object) {
|
||||
private_publisher_instance.send(json_object);
|
||||
}
|
||||
}
|
@ -1,5 +0,0 @@
|
||||
package utility_beans;
|
||||
|
||||
public enum OperationalMode {
|
||||
DIRECTOR,DETECTOR
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
package utility_beans;
|
||||
package utility_beans.broker_communication;
|
||||
|
||||
import eu.nebulouscloud.exn.Connector;
|
||||
import eu.nebulouscloud.exn.core.Publisher;
|
||||
@ -7,28 +7,29 @@ import eu.nebulouscloud.exn.settings.StaticExnConfig;
|
||||
import org.json.simple.JSONObject;
|
||||
import org.json.simple.parser.JSONParser;
|
||||
import org.json.simple.parser.ParseException;
|
||||
import utility_beans.broker_communication.CustomConnectorHandler;
|
||||
import utility_beans.broker_communication.ExtendedConnector;
|
||||
|
||||
import java.lang.reflect.Array;
|
||||
import java.util.*;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import static configuration.Constants.*;
|
||||
import static java.lang.Thread.sleep;
|
||||
import static java.util.logging.Level.INFO;
|
||||
import static slo_violation_detector_engine.detector.DetectorSubcomponent.detector_subcomponents;
|
||||
import static utilities.DebugDataSubscription.debug_data_output_topic_name;
|
||||
|
||||
public class BrokerPublisher {
|
||||
public static String EMPTY="";
|
||||
private static HashMap<String, HashSet<String>> broker_and_topics_to_publish_to = new HashMap<>();
|
||||
private Publisher private_publisher_instance;
|
||||
private ArrayList<Publisher> publishers = new ArrayList<>();
|
||||
|
||||
private Connector active_connector;
|
||||
private ExtendedConnector active_connector;
|
||||
private String topic;
|
||||
private String broker_ip;
|
||||
private int broker_port;
|
||||
|
||||
public BrokerPublisher(String topic, String broker_ip, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation) {
|
||||
public BrokerPublisher(String topic, String broker_ip, int broker_port, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation) {
|
||||
boolean able_to_initialize_BrokerPublisher = topic!=null && broker_ip!=null && brokerUsername!=null && brokerPassword!=null && !topic.equals(EMPTY) && !broker_ip.equals(EMPTY) && !brokerUsername.equals(EMPTY) && !brokerPassword.equals(EMPTY);
|
||||
|
||||
if (!able_to_initialize_BrokerPublisher){
|
||||
@ -37,9 +38,6 @@ public class BrokerPublisher {
|
||||
boolean publisher_configuration_changed;
|
||||
if (!broker_and_topics_to_publish_to.containsKey(broker_ip)){
|
||||
HashSet<String> topics_to_publish_to = new HashSet<>();
|
||||
topics_to_publish_to.add(debug_data_output_topic_name);
|
||||
topics_to_publish_to.add(topic_for_severity_announcement);
|
||||
//topics_to_publish_to.add(slo_rules_topic);
|
||||
topics_to_publish_to.add(topic);
|
||||
broker_and_topics_to_publish_to.put(broker_ip,topics_to_publish_to);
|
||||
publisher_configuration_changed = true;
|
||||
@ -56,39 +54,44 @@ public class BrokerPublisher {
|
||||
|
||||
if (publisher_configuration_changed){
|
||||
// for (String current_broker_ip : broker_and_topics_to_publish_to.keySet()){
|
||||
publishers.clear();
|
||||
for (String broker_topic : broker_and_topics_to_publish_to.get(broker_ip)){
|
||||
//ArrayList<Publisher> publishers = new ArrayList<>();
|
||||
Publisher publisher = new Publisher(slovid_publisher_key + NAME_SEPARATOR +broker_topic, broker_topic, true, true);
|
||||
publishers.add(publisher);
|
||||
if (broker_topic.equals(topic)){
|
||||
this.private_publisher_instance = publishers.get(publishers.size()-1);
|
||||
this.topic = broker_topic;
|
||||
this.broker_ip = broker_ip;
|
||||
}
|
||||
}
|
||||
//CustomConnectorHandler custom_handler = new CustomConnectorHandler();
|
||||
|
||||
active_connector = new Connector("slovid"
|
||||
, new ConnectorHandler() {}
|
||||
, publishers
|
||||
, List.of(),
|
||||
false,
|
||||
false,
|
||||
new StaticExnConfig(
|
||||
broker_ip,
|
||||
5672,
|
||||
brokerUsername,
|
||||
brokerPassword,
|
||||
60,
|
||||
EMPTY
|
||||
)
|
||||
);
|
||||
active_connector.start();
|
||||
//Logger.getGlobal().log(INFO,"Sending from EXTERIOR");
|
||||
//private_publisher_instance.send(new JSONObject());
|
||||
|
||||
Logger.getGlobal().log(Level.INFO,"Publisher configuration changed, creating new connector at "+broker_ip+" for topic "+topic);
|
||||
if (active_connector!=null) {
|
||||
active_connector.stop(new ArrayList<>(), publishers);
|
||||
}
|
||||
publishers.clear();
|
||||
for (String broker_topic : broker_and_topics_to_publish_to.get(broker_ip)){
|
||||
//ArrayList<Publisher> publishers = new ArrayList<>();
|
||||
Publisher publisher = new Publisher("resource_manager_"+broker_topic, broker_topic, true, true);
|
||||
publishers.add(publisher);
|
||||
if (broker_topic.equals(topic)){
|
||||
this.private_publisher_instance = publishers.get(publishers.size()-1);
|
||||
this.topic = broker_topic;
|
||||
this.broker_ip = broker_ip;
|
||||
this.broker_port = broker_port;
|
||||
}
|
||||
}
|
||||
//CustomConnectorHandler custom_handler = new CustomConnectorHandler();
|
||||
|
||||
active_connector = new ExtendedConnector("resource_manager"
|
||||
, new CustomConnectorHandler() {}
|
||||
, publishers
|
||||
, List.of(),
|
||||
false,
|
||||
false,
|
||||
new StaticExnConfig(
|
||||
broker_ip,
|
||||
broker_port,
|
||||
brokerUsername,
|
||||
brokerPassword,
|
||||
60,
|
||||
EMPTY
|
||||
)
|
||||
);
|
||||
active_connector.start();
|
||||
//Logger.getGlobal().log(INFO,"Sending from EXTERIOR");
|
||||
//private_publisher_instance.send(new JSONObject());
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
//TODO The methods below assume that the only content to be sent is json-like
|
||||
@ -100,7 +103,7 @@ public class BrokerPublisher {
|
||||
try {
|
||||
json_object = (JSONObject) parser.parse(json_string_content);
|
||||
} catch (ParseException p) {
|
||||
Logger.getGlobal().log(Level.WARNING, "Could not parse the string content to be published to the broker as json");
|
||||
Logger.getGlobal().log(Level.WARNING, "Could not parse the string content to be published to the broker as json, which is the following: "+json_string_content);
|
||||
}
|
||||
if (private_publisher_instance != null) {
|
||||
private_publisher_instance.send(json_object);
|
||||
@ -109,8 +112,5 @@ public class BrokerPublisher {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void publish(String json_string_content) {
|
||||
publish(json_string_content,detector_subcomponents.keySet());
|
||||
}
|
||||
}
|
||||
|
@ -1,4 +1,4 @@
|
||||
package utility_beans;
|
||||
package utility_beans.broker_communication;
|
||||
|
||||
import eu.nebulouscloud.exn.core.Consumer;
|
||||
import eu.nebulouscloud.exn.core.Context;
|
||||
@ -18,99 +18,114 @@ import static java.util.logging.Level.INFO;
|
||||
|
||||
public class BrokerSubscriber {
|
||||
|
||||
private class MessageProcessingHandler extends Handler{
|
||||
private class MessageProcessingHandler extends Handler {
|
||||
private BrokerSubscriptionDetails broker_details;
|
||||
private static final BiFunction temporary_function = (Object o, Object o2) -> {
|
||||
//System.out.println("");
|
||||
Logger.getGlobal().log(INFO,"REPLACE_TEMPORARY_HANDLING_FUNCTIONALITY");
|
||||
Logger.getGlobal().log(INFO, "REPLACE_TEMPORARY_HANDLING_FUNCTIONALITY");
|
||||
return "IN_PROCESSING";
|
||||
};
|
||||
private BiFunction<BrokerSubscriptionDetails,String,String> processing_function;
|
||||
private BiFunction<BrokerSubscriptionDetails, String, String> processing_function;
|
||||
|
||||
@Override
|
||||
public void onMessage(String key, String address, Map body, Message message, Context context) {
|
||||
Logger.getGlobal().log(info_logging_level,"Handling message for address "+address);
|
||||
Logger.getGlobal().log(INFO, "Handling message for address " + address);
|
||||
processing_function.apply(broker_details, JSONValue.toJSONString(body));
|
||||
}
|
||||
public MessageProcessingHandler(BrokerSubscriptionDetails broker_details){
|
||||
|
||||
public MessageProcessingHandler(BrokerSubscriptionDetails broker_details) {
|
||||
this.broker_details = broker_details;
|
||||
this.processing_function = temporary_function;
|
||||
}
|
||||
public MessageProcessingHandler(BiFunction<BrokerSubscriptionDetails,String,String> biFunction, BrokerSubscriptionDetails broker_details){
|
||||
|
||||
public MessageProcessingHandler(BiFunction<BrokerSubscriptionDetails, String, String> biFunction, BrokerSubscriptionDetails broker_details) {
|
||||
this.broker_details = broker_details;
|
||||
this.processing_function = biFunction;
|
||||
}
|
||||
|
||||
public BiFunction getProcessing_function() {
|
||||
return processing_function;
|
||||
}
|
||||
|
||||
public void setProcessing_function(BiFunction processing_function) {
|
||||
this.processing_function = processing_function;
|
||||
}
|
||||
}
|
||||
|
||||
private static HashMap<String, HashSet<String>> broker_and_topics_to_subscribe_to = new HashMap<>();
|
||||
private static HashMap<String,HashMap<String,Consumer>> active_consumers_per_topic_per_broker_ip = new HashMap<>();
|
||||
private static HashMap<String,ExtendedConnector> current_connectors = new HashMap<>();
|
||||
private static HashMap<String, HashMap<String, Consumer>> active_consumers_per_topic_per_broker_ip = new HashMap<>();
|
||||
private static HashMap<String, ExtendedConnector> current_connectors = new HashMap<>();
|
||||
private String topic;
|
||||
private String broker_ip;
|
||||
private int broker_port;
|
||||
private String brokerUsername;
|
||||
private String brokerPassword;
|
||||
BrokerSubscriptionDetails broker_details;
|
||||
public BrokerSubscriber(String topic, String broker_ip, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation,String application_name){
|
||||
boolean able_to_initialize_BrokerSubscriber = topic!=null && broker_ip!=null && brokerUsername!=null && brokerPassword!=null && !topic.equals(EMPTY) && !broker_ip.equals(EMPTY) && !brokerUsername.equals(EMPTY) && !brokerPassword.equals(EMPTY);
|
||||
|
||||
if (!able_to_initialize_BrokerSubscriber){
|
||||
public BrokerSubscriber(String topic, String broker_ip, int broker_port, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation, String application_name) {
|
||||
boolean able_to_initialize_BrokerSubscriber = topic != null && broker_ip != null && broker_port >0 && brokerUsername != null && brokerPassword != null && !topic.equals(EMPTY) && !broker_ip.equals(EMPTY) && !brokerUsername.equals(EMPTY) && !brokerPassword.equals(EMPTY);
|
||||
|
||||
if (!able_to_initialize_BrokerSubscriber) {
|
||||
try {
|
||||
throw new Exception("Unable to initialize Subscriber");
|
||||
} catch (Exception e) {
|
||||
String message = "Topic is "+topic+" broker ip is "+broker_ip+" broker username/pass are "+brokerUsername+","+brokerPassword;
|
||||
String message = "Topic is " + topic + " broker ip is " + broker_ip + " broker port is "+broker_port+" broker username/pass are " + brokerUsername + "," + brokerPassword;
|
||||
|
||||
Logger.getGlobal().log(INFO,message);
|
||||
Logger.getGlobal().log(INFO, message);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
broker_details = new BrokerSubscriptionDetails(broker_ip,brokerUsername,brokerPassword,application_name,topic);
|
||||
broker_details = new BrokerSubscriptionDetails(broker_ip, brokerUsername, brokerPassword, application_name, topic);
|
||||
boolean subscriber_configuration_changed;
|
||||
if (!broker_and_topics_to_subscribe_to.containsKey(broker_ip)){
|
||||
if (!broker_and_topics_to_subscribe_to.containsKey(broker_ip)) {
|
||||
HashSet<String> topics_to_subscribe_to = new HashSet<>();
|
||||
//topics_to_subscribe_to.add(realtime_metric_topic_name);
|
||||
//topics_to_subscribe_to.add(forecasted_metric_topic_name);
|
||||
//topics_to_subscribe_to.add();
|
||||
topics_to_subscribe_to.add(topic);
|
||||
broker_and_topics_to_subscribe_to.put(broker_ip,new HashSet<>());
|
||||
active_consumers_per_topic_per_broker_ip.put(broker_ip,new HashMap<>());
|
||||
broker_and_topics_to_subscribe_to.put(broker_ip, new HashSet<>());
|
||||
active_consumers_per_topic_per_broker_ip.put(broker_ip, new HashMap<>());
|
||||
broker_and_topics_to_subscribe_to.get(broker_ip).add(topic);
|
||||
|
||||
subscriber_configuration_changed = true;
|
||||
}else{
|
||||
if (!broker_and_topics_to_subscribe_to.get(broker_ip).contains(topic)){
|
||||
} else {
|
||||
if (!broker_and_topics_to_subscribe_to.get(broker_ip).contains(topic)) {
|
||||
broker_and_topics_to_subscribe_to.get(broker_ip).add(topic);
|
||||
subscriber_configuration_changed = true;
|
||||
}
|
||||
else{
|
||||
} else {
|
||||
subscriber_configuration_changed = false;
|
||||
}
|
||||
}
|
||||
if (subscriber_configuration_changed){
|
||||
Consumer current_consumer = new Consumer(topic, topic, new MessageProcessingHandler(broker_details),true,true);
|
||||
active_consumers_per_topic_per_broker_ip.get(broker_ip).put(topic,current_consumer);
|
||||
if (subscriber_configuration_changed) {
|
||||
Consumer current_consumer;
|
||||
if (application_name != null && !application_name.equals(EMPTY)) { //Create a consumer for one application
|
||||
Logger.getGlobal().log(INFO, "APP level subscriber " + topic);
|
||||
current_consumer = new Consumer(topic, topic, new MessageProcessingHandler(broker_details), application_name, true, true);
|
||||
} else { //Allow the consumer to get information from any publisher
|
||||
current_consumer = new Consumer(topic, topic, new MessageProcessingHandler(broker_details), true, true);
|
||||
Logger.getGlobal().log(INFO, "HIGH level subscriber " + topic);
|
||||
}
|
||||
active_consumers_per_topic_per_broker_ip.get(broker_ip).put(topic, current_consumer);
|
||||
|
||||
this.topic = topic;
|
||||
this.broker_ip = broker_ip;
|
||||
this.brokerUsername = brokerUsername;
|
||||
this.brokerPassword = brokerPassword;
|
||||
add_topic_consumer_to_broker_connector(current_consumer);
|
||||
this.topic = topic;
|
||||
this.broker_ip = broker_ip;
|
||||
this.broker_port = broker_port;
|
||||
this.brokerUsername = brokerUsername;
|
||||
this.brokerPassword = brokerPassword;
|
||||
add_topic_consumer_to_broker_connector(current_consumer);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This method updates the global connector of SLOViD to the AMQP server, by adding support for one more component
|
||||
* This method updates the global connector of Resource manager to the AMQP server, by adding support for one more component
|
||||
*/
|
||||
private void add_topic_consumer_to_broker_connector(Consumer new_consumer) {
|
||||
if (current_connectors.get(broker_ip)!=null) {
|
||||
if (current_connectors.get(broker_ip) != null) {
|
||||
current_connectors.get(broker_ip).add_consumer(new_consumer);
|
||||
}else {
|
||||
} else {
|
||||
ArrayList<Consumer> consumers = new ArrayList<>();
|
||||
consumers.add(new_consumer);
|
||||
ExtendedConnector extended_connector = new ExtendedConnector("slovid",
|
||||
ExtendedConnector extended_connector = new ExtendedConnector("resource_manager",
|
||||
new CustomConnectorHandler() {
|
||||
},
|
||||
List.of(),
|
||||
@ -119,7 +134,7 @@ public class BrokerSubscriber {
|
||||
false,
|
||||
new StaticExnConfig(
|
||||
broker_ip,
|
||||
5672,
|
||||
broker_port,
|
||||
brokerUsername,
|
||||
brokerPassword,
|
||||
60,
|
||||
@ -131,44 +146,50 @@ public class BrokerSubscriber {
|
||||
}
|
||||
}
|
||||
|
||||
private void remove_topic_from_broker_connector (String topic_key){
|
||||
if (current_connectors.get(broker_ip)!=null){
|
||||
private void remove_topic_from_broker_connector(String topic_key) {
|
||||
if (current_connectors.get(broker_ip) != null) {
|
||||
current_connectors.get(broker_ip).remove_consumer_with_key(topic_key);
|
||||
}
|
||||
}
|
||||
|
||||
public int subscribe(BiFunction function, AtomicBoolean stop_signal) {
|
||||
public int subscribe(BiFunction function, String application_name, AtomicBoolean stop_signal) {
|
||||
int exit_status = -1;
|
||||
Logger.getGlobal().log(INFO,"ESTABLISHING SUBSCRIPTION for "+topic);
|
||||
Logger.getGlobal().log(INFO, "ESTABLISHING SUBSCRIPTION for " + topic);
|
||||
//First remove any leftover consumer
|
||||
if (active_consumers_per_topic_per_broker_ip.containsKey(broker_ip)) {
|
||||
active_consumers_per_topic_per_broker_ip.get(broker_ip).remove(topic);
|
||||
remove_topic_from_broker_connector(topic);
|
||||
}else{
|
||||
active_consumers_per_topic_per_broker_ip.put(broker_ip,new HashMap<>());
|
||||
} else {
|
||||
active_consumers_per_topic_per_broker_ip.put(broker_ip, new HashMap<>());
|
||||
}
|
||||
//Then add the new consumer
|
||||
Consumer new_consumer = new Consumer(topic,topic,new MessageProcessingHandler(function,broker_details),true,true);
|
||||
new_consumer.setProperty("topic",topic);
|
||||
active_consumers_per_topic_per_broker_ip.get(broker_ip).put(topic,new_consumer);
|
||||
Consumer new_consumer;
|
||||
if (application_name != null && !application_name.equals(EMPTY)) {
|
||||
new_consumer = new Consumer(topic, topic, new MessageProcessingHandler(function, broker_details), application_name,
|
||||
true, true);
|
||||
} else {
|
||||
new_consumer = new Consumer(topic, topic, new MessageProcessingHandler(function, broker_details), true, true);
|
||||
}
|
||||
new_consumer.setProperty("topic", topic);
|
||||
active_consumers_per_topic_per_broker_ip.get(broker_ip).put(topic, new_consumer);
|
||||
add_topic_consumer_to_broker_connector(new_consumer);
|
||||
|
||||
Logger.getGlobal().log(INFO,"ESTABLISHED SUBSCRIPTION to topic "+topic);
|
||||
synchronized (stop_signal){
|
||||
while (!stop_signal.get()){
|
||||
try{
|
||||
Logger.getGlobal().log(INFO, "ESTABLISHED SUBSCRIPTION to topic " + topic);
|
||||
synchronized (stop_signal) {
|
||||
while (!stop_signal.get()) {
|
||||
try {
|
||||
stop_signal.wait();
|
||||
}catch (Exception e){
|
||||
Logger.getGlobal().log(Level.WARNING,e.toString()+" in thread "+Thread.currentThread().getName());
|
||||
} catch (Exception e) {
|
||||
Logger.getGlobal().log(Level.WARNING, e.toString() + " in thread " + Thread.currentThread().getName());
|
||||
break;
|
||||
}
|
||||
}
|
||||
Logger.getGlobal().log(INFO,"Stopping subscription for broker "+broker_ip+" and topic "+topic + "at thread "+Thread.currentThread().getName());
|
||||
Logger.getGlobal().log(INFO, "Stopping subscription for broker " + broker_ip + " and topic " + topic + "at thread " + Thread.currentThread().getName());
|
||||
stop_signal.set(false);
|
||||
}
|
||||
active_consumers_per_topic_per_broker_ip.get(broker_ip).remove(topic);
|
||||
remove_topic_from_broker_connector(topic);
|
||||
exit_status=0;
|
||||
exit_status = 0;
|
||||
return exit_status;
|
||||
}
|
||||
|
||||
@ -187,5 +208,5 @@ public class BrokerSubscriber {
|
||||
public static String final_metric_predictions_topic(String metric) {
|
||||
return topic_prefix_final_predicted_metrics +metric;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -1,12 +1,12 @@
|
||||
package utility_beans;
|
||||
package utility_beans.broker_communication;
|
||||
|
||||
import static configuration.Constants.EMPTY;
|
||||
import static configuration.Constants.default_application_name;
|
||||
import static configuration.Constants.*;
|
||||
|
||||
public class BrokerSubscriptionDetails {
|
||||
String broker_username = "admin";
|
||||
String broker_password = "admin";
|
||||
String broker_ip = "localhost";
|
||||
int broker_port = 5672;
|
||||
String application_name = default_application_name;
|
||||
String topic = EMPTY;
|
||||
|
||||
@ -67,4 +67,12 @@ public class BrokerSubscriptionDetails {
|
||||
public void setTopic(String topic) {
|
||||
this.topic = topic;
|
||||
}
|
||||
|
||||
public int getBroker_port() {
|
||||
return broker_port;
|
||||
}
|
||||
|
||||
public void setBroker_port(int broker_port) {
|
||||
this.broker_port = broker_port;
|
||||
}
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
package utility_beans;
|
||||
package utility_beans.broker_communication;
|
||||
|
||||
import eu.nebulouscloud.exn.core.Consumer;
|
||||
import eu.nebulouscloud.exn.core.Context;
|
@ -0,0 +1,289 @@
|
||||
package utility_beans.broker_communication;
|
||||
|
||||
import javax.swing.*;
|
||||
import eu.nebulouscloud.exn.Connector;
|
||||
import eu.nebulouscloud.exn.core.Publisher;
|
||||
import eu.nebulouscloud.exn.handlers.ConnectorHandler;
|
||||
import eu.nebulouscloud.exn.settings.StaticExnConfig;
|
||||
import org.json.simple.JSONObject;
|
||||
import org.json.simple.parser.JSONParser;
|
||||
import org.json.simple.parser.ParseException;
|
||||
|
||||
import java.awt.*;
|
||||
import java.util.*;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import static configuration.Constants.*;
|
||||
import static utilities.DebugDataSubscription.debug_data_trigger_topic_name;
|
||||
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class CustomDataPublisher {
|
||||
private static final Map<String, String> presetTexts = new HashMap<>();
|
||||
|
||||
static {
|
||||
update_event_data();
|
||||
}
|
||||
|
||||
private static void update_event_data(){
|
||||
presetTexts.put("eu.nebulouscloud.monitoring.slo.new", "{\n" +
|
||||
" \"name\": \"_Application1\",\n" +
|
||||
" \"operator\": \"OR\",\n" +
|
||||
" \"constraints\": [\n" +
|
||||
" {\n" +
|
||||
" \"name\": \"cpu_and_memory_or_swap_too_high\",\n" +
|
||||
" \"operator\": \"AND\",\n" +
|
||||
" \"constraints\": [\n" +
|
||||
" {\n" +
|
||||
" \"name\": \"cpu_usage_high\",\n" +
|
||||
" \"metric\": \"cpu_usage\",\n" +
|
||||
" \"operator\": \">\",\n" +
|
||||
" \"threshold\": 80.0\n" +
|
||||
" },\n" +
|
||||
" {\n" +
|
||||
" \"name\": \"memory_or_swap_usage_high\",\n" +
|
||||
" \"operator\": \"OR\",\n" +
|
||||
" \"constraints\": [\n" +
|
||||
" {\n" +
|
||||
" \"name\": \"memory_usage_high\",\n" +
|
||||
" \"metric\": \"ram_usage\",\n" +
|
||||
" \"operator\": \">\",\n" +
|
||||
" \"threshold\": 70.0\n" +
|
||||
" },\n" +
|
||||
" {\n" +
|
||||
" \"name\": \"disk_usage_high\",\n" +
|
||||
" \"metric\": \"swap_usage\",\n" +
|
||||
" \"operator\": \">\",\n" +
|
||||
" \"threshold\": 50.0\n" +
|
||||
" }\n" +
|
||||
" ]\n" +
|
||||
" }\n" +
|
||||
" ]\n" +
|
||||
" }\n" +
|
||||
" ]\n" +
|
||||
"}");
|
||||
presetTexts.put("eu.nebulouscloud.monitoring.realtime.cpu_usage",
|
||||
"{\n" +
|
||||
" \"metricValue\": 12.34,\n" +
|
||||
" \"level\": 1,\n" +
|
||||
" \"component_id\":\"postgresql_1\",\n" +
|
||||
" \"timestamp\": "+(int)(System.currentTimeMillis()/1000)+"\n" +
|
||||
"}\n");
|
||||
presetTexts.put("eu.nebulouscloud.monitoring.predicted.cpu_usage", "{\n" +
|
||||
" \"metricValue\": 92.34,\n" +
|
||||
" \"level\": 1,\n" +
|
||||
" \"timestamp\": "+(int)(System.currentTimeMillis()/1000)+"\n" +
|
||||
" \"probability\": 0.98,\n" +
|
||||
" \"confidence_interval\" : [8,15]\n" +
|
||||
" \"predictionTime\": "+(int)(10+System.currentTimeMillis()/1000)+"\n" +
|
||||
"}");
|
||||
presetTexts.put("eu.nebulouscloud.monitoring.metric_list","{\n" +
|
||||
" \"name\": \"_Application1\",\n" +
|
||||
" \"version\": 1,\n" +
|
||||
" \"metric_list\": [\n" +
|
||||
" {\n" +
|
||||
" \"name\": \"cpu_usage\",\n" +
|
||||
" \"upper_bound\": \"100.0\",\n" +
|
||||
" \"lower_bound\": \"0.0\"\n" +
|
||||
" },\n" +
|
||||
" {\n" +
|
||||
" \"name\": \"ram_usage\",\n" +
|
||||
" \"upper_bound\": \"100.0\",\n" +
|
||||
" \"lower_bound\": \"0.0\"\n" +
|
||||
" }\n" +
|
||||
" ]\n" +
|
||||
"}");
|
||||
presetTexts.put("eu.nebulouscloud.forecasting.start_forecasting.exponentialsmoothing","{\n" +
|
||||
" \"name\": \"_Application1\",\n" +
|
||||
" \"metrics\": [\"cpu_usage\"],\n" +
|
||||
" \"timestamp\": 1705046535,\n" +
|
||||
" \"epoch_start\": 1705046500,\n" +
|
||||
" \"number_of_forward_predictions\": 5,\n" +
|
||||
" \"prediction_horizon\": 10\n" +
|
||||
"}");
|
||||
presetTexts.put(debug_data_trigger_topic_name,"{}");
|
||||
}
|
||||
private Publisher private_publisher_instance;
|
||||
private String topic;
|
||||
private String broker_ip;
|
||||
|
||||
public CustomDataPublisher(String broker_topic, String broker_ip, Integer broker_port,String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation,String publisher_key) {
|
||||
|
||||
boolean publisher_configuration_changed;
|
||||
ArrayList<Publisher> publishers = new ArrayList<>();
|
||||
private_publisher_instance = new Publisher(slovid_subscriber_key,broker_topic,true,true);
|
||||
publishers.add(private_publisher_instance);
|
||||
|
||||
|
||||
Connector connector = new Connector("slovid",
|
||||
new ConnectorHandler() {
|
||||
}, publishers
|
||||
, List.of(),
|
||||
false,
|
||||
false,
|
||||
new StaticExnConfig(
|
||||
broker_ip,
|
||||
broker_port,
|
||||
brokerUsername,
|
||||
brokerPassword,
|
||||
60,
|
||||
EMPTY
|
||||
)
|
||||
);
|
||||
connector.start();
|
||||
|
||||
}
|
||||
|
||||
|
||||
public CustomDataPublisher(String broker_topic, String broker_ip, Integer broker_port, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation) {
|
||||
this(broker_topic,broker_ip,broker_port,brokerUsername,brokerPassword,amqLibraryConfigurationLocation, slovid_subscriber_key);
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
JFrame frame = new JFrame("Broker input app");
|
||||
JTextField broker_ipTextField = new JTextField("localhost", 30);
|
||||
JTextField broker_portTextField = new JTextField("5672", 20);
|
||||
JComboBox<String> TopicTextField = new JComboBox<>(new String[]{"eu.nebulouscloud.monitoring.slo.new","eu.nebulouscloud.monitoring.realtime.cpu_usage", "eu.nebulouscloud.monitoring.predicted.cpu_usage", "eu.nebulouscloud.monitoring.metric_list","eu.nebulouscloud.forecasting.start_forecasting.exponentialsmoothing",debug_data_trigger_topic_name});
|
||||
TopicTextField.setEditable(true);
|
||||
JTextField PublisherKeyTextField = new JTextField("slovid", 20);
|
||||
JTextField PublisherApplicationTextField = new JTextField(default_application_name, 20);
|
||||
JTextArea largeTextArea = new JTextArea(10, 30);
|
||||
JButton submitButton = new JButton("Send");
|
||||
|
||||
AtomicReference<String> broker_ip = new AtomicReference<>();
|
||||
AtomicReference<Integer> broker_port = new AtomicReference<>();
|
||||
AtomicReference<String> broker_topic = new AtomicReference<>();
|
||||
AtomicReference<String> message_payload = new AtomicReference<>();
|
||||
AtomicReference<String> publisher_key = new AtomicReference<>();
|
||||
AtomicReference<String> publisher_app = new AtomicReference<>();
|
||||
|
||||
TopicTextField.addActionListener(e -> {
|
||||
update_event_data();
|
||||
String selectedOption = (String) TopicTextField.getSelectedItem();
|
||||
String presetText = presetTexts.getOrDefault(selectedOption, "");
|
||||
largeTextArea.setText(presetText);
|
||||
});
|
||||
|
||||
submitButton.addActionListener(e -> {
|
||||
broker_ip.set(broker_ipTextField.getText());
|
||||
broker_port.set(Integer.parseInt(broker_portTextField.getText()));
|
||||
broker_topic.set(TopicTextField.getSelectedItem().toString());
|
||||
message_payload.set(largeTextArea.getText());
|
||||
publisher_key.set(PublisherKeyTextField.getText());
|
||||
publisher_app.set(PublisherApplicationTextField.getText());
|
||||
CustomDataPublisher publisher = new CustomDataPublisher(broker_topic.toString(), broker_ip.toString(), broker_port.get(),"admin", "admin", EMPTY, publisher_key.toString());
|
||||
if (broker_topic.toString().equals(metric_list_topic)||broker_topic.toString().equals(slo_rules_topic)||broker_topic.toString().equals(debug_data_trigger_topic_name)||broker_topic.toString().equals(topic_for_lost_device_announcement)){
|
||||
publisher.publish(message_payload.toString(), String.valueOf(publisher_app)); //second argument was EMPTY
|
||||
}else{
|
||||
publisher.publish(message_payload.toString(), String.valueOf(publisher_app));
|
||||
}
|
||||
});
|
||||
|
||||
JPanel broker_config_panel = new JPanel(new GridBagLayout());
|
||||
GridBagConstraints c = new GridBagConstraints();
|
||||
c.insets = new Insets(5, 5, 5, 5); // This will add 5 pixels of space on all sides of each component
|
||||
|
||||
int init_value_x =0;
|
||||
int init_value_y =0;
|
||||
|
||||
c.gridx = init_value_x;
|
||||
c.gridy = init_value_y;
|
||||
c.anchor = GridBagConstraints.LINE_END;
|
||||
broker_config_panel.add(new JLabel("Broker to publish to:"), c);
|
||||
|
||||
c.gridx = (++init_value_x)%2;
|
||||
c.gridy = (++init_value_y)/2;
|
||||
c.anchor = GridBagConstraints.LINE_START;
|
||||
broker_config_panel.add(broker_ipTextField, c);
|
||||
|
||||
c.gridx = (++init_value_x)%2;
|
||||
c.gridy = (++init_value_y)/2;
|
||||
c.anchor = GridBagConstraints.LINE_END;
|
||||
broker_config_panel.add(new JLabel("Broker port to use:"), c);
|
||||
|
||||
c.gridx = (++init_value_x)%2;
|
||||
c.gridy = (++init_value_y)/2;
|
||||
c.anchor = GridBagConstraints.LINE_START;
|
||||
broker_config_panel.add(broker_portTextField, c);
|
||||
|
||||
|
||||
c.gridx = (++init_value_x)%2;
|
||||
c.gridy = (++init_value_y)/2;
|
||||
c.anchor = GridBagConstraints.LINE_END;
|
||||
broker_config_panel.add(new JLabel("Topic to publish to:"), c);
|
||||
|
||||
c.gridx = (++init_value_x)%2;
|
||||
c.gridy = (++init_value_y)/2;
|
||||
c.anchor = GridBagConstraints.LINE_START;
|
||||
broker_config_panel.add(TopicTextField, c);
|
||||
|
||||
c.gridx = (++init_value_x)%2;
|
||||
c.gridy = (++init_value_y)/2;
|
||||
c.anchor = GridBagConstraints.LINE_END;
|
||||
broker_config_panel.add(new JLabel("Key to publish with:"), c);
|
||||
|
||||
c.gridx = (++init_value_x)%2;
|
||||
c.gridy = (++init_value_y)/2;
|
||||
c.anchor = GridBagConstraints.LINE_START;
|
||||
broker_config_panel.add(PublisherKeyTextField, c);
|
||||
|
||||
c.gridx = (++init_value_x)%2;
|
||||
c.gridy = (++init_value_y)/2;
|
||||
c.anchor = GridBagConstraints.LINE_END;
|
||||
broker_config_panel.add(new JLabel("Application to publish with:"), c);
|
||||
|
||||
c.gridx = (++init_value_x)%2;
|
||||
c.gridy = (++init_value_y)/2;
|
||||
c.anchor = GridBagConstraints.LINE_START;
|
||||
broker_config_panel.add(PublisherApplicationTextField, c);
|
||||
|
||||
c.gridx = (++init_value_x)%2;
|
||||
c.gridy = (++init_value_y)/2;
|
||||
c.anchor = GridBagConstraints.LINE_END;
|
||||
broker_config_panel.add(new JLabel("Text to publish:"), c);
|
||||
|
||||
c.gridx = (++init_value_x)%2;
|
||||
c.gridy = (++init_value_y)/2;
|
||||
c.anchor = GridBagConstraints.LINE_START;
|
||||
c.gridwidth = 2;
|
||||
c.fill = GridBagConstraints.BOTH;
|
||||
c.weightx = 1.0;
|
||||
c.weighty = 1.0;
|
||||
broker_config_panel.add(new JScrollPane(largeTextArea), c);
|
||||
|
||||
c.gridx = (++init_value_x)%2;
|
||||
c.gridy = (++init_value_y)/2;
|
||||
c.anchor = GridBagConstraints.LINE_END;
|
||||
c.gridwidth = 1;
|
||||
c.fill = GridBagConstraints.NONE;
|
||||
c.weightx = 0.0;
|
||||
c.weighty = 0.0;
|
||||
broker_config_panel.add(submitButton, c);
|
||||
|
||||
frame.add(broker_config_panel);
|
||||
frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
|
||||
frame.pack();
|
||||
frame.setVisible(true);
|
||||
}
|
||||
|
||||
//TODO This assumes that the only content to be sent is json-like
|
||||
public void publish(String json_string_content, String application_name) {
|
||||
JSONParser parser = new JSONParser();
|
||||
JSONObject json_object = new JSONObject();
|
||||
try{
|
||||
json_object = (JSONObject) parser.parse(json_string_content);
|
||||
}catch (ParseException p){
|
||||
Logger.getGlobal().log(Level.SEVERE,"Could not parse the string content");
|
||||
}
|
||||
if (application_name!=null && !application_name.equals(EMPTY)){
|
||||
private_publisher_instance.send(json_object, application_name);
|
||||
}else {
|
||||
private_publisher_instance.send(json_object);
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,516 @@
|
||||
package utility_beans.broker_communication;
|
||||
|
||||
import eu.nebulouscloud.exn.core.Consumer;
|
||||
import eu.nebulouscloud.exn.core.Context;
|
||||
import eu.nebulouscloud.exn.core.Handler;
|
||||
import eu.nebulouscloud.exn.handlers.ConnectorHandler;
|
||||
import eu.nebulouscloud.exn.settings.StaticExnConfig;
|
||||
import org.apache.qpid.protonj2.client.Message;
|
||||
import org.apache.qpid.protonj2.client.exceptions.ClientException;
|
||||
import utility_beans.synchronization.SynchronizedBoolean;
|
||||
|
||||
import javax.swing.*;
|
||||
import java.awt.*;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import static configuration.Constants.*;
|
||||
import static utilities.DebugDataSubscription.debug_data_trigger_topic_name;
|
||||
|
||||
public class CustomDataSubscriber {
|
||||
private static final Map<String, String> presetTexts = new HashMap<>();
|
||||
|
||||
private static final SynchronizedBoolean new_message_arrived = new SynchronizedBoolean();
|
||||
private static AtomicReference<String> message_payload = new AtomicReference<>(EMPTY);
|
||||
|
||||
static class SimpleConnectorHandler extends ConnectorHandler{
|
||||
private String broker_topic;
|
||||
private String broker_ip;
|
||||
|
||||
private String application_name;
|
||||
|
||||
Handler on_message_handler;
|
||||
SimpleConnectorHandler(String broker_ip,String broker_topic, String application_name){
|
||||
this.broker_topic = broker_topic;
|
||||
this.broker_ip = broker_ip;
|
||||
this.application_name = application_name;
|
||||
|
||||
this.on_message_handler = new Handler() {
|
||||
@Override
|
||||
public void onMessage(String key, String address, Map body, Message message, Context context) {
|
||||
Logger.getGlobal().log(info_logging_level,"Received message with payload"+body.toString()+"\n");
|
||||
synchronized (new_message_arrived) {
|
||||
new_message_arrived.notify();
|
||||
new_message_arrived.setValue(true);
|
||||
message_payload = new AtomicReference<>(body.toString());
|
||||
//message_payload = body.toString();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
@Override
|
||||
public void onReady(Context context) {
|
||||
super.onReady(context);
|
||||
Consumer consumer = new Consumer(slovid_subscriber_key,broker_topic,on_message_handler,
|
||||
application_name,true,true);
|
||||
Logger.getGlobal().log(info_logging_level,"Unregistering old consumer");
|
||||
//context.unregisterConsumer(slovid_subscriber_key);
|
||||
Logger.getGlobal().log(info_logging_level,"Registering new consumer");
|
||||
context.registerConsumer(consumer);
|
||||
}
|
||||
}
|
||||
|
||||
static {
|
||||
update_event_data();
|
||||
}
|
||||
|
||||
private static SynchronizedBoolean notify_create_subscriber = new SynchronizedBoolean(false);
|
||||
private static void update_event_data(){
|
||||
presetTexts.put(debug_data_trigger_topic_name,"{}");
|
||||
}
|
||||
private static ExtendedConnector private_connector;
|
||||
|
||||
private void stop_connector(){
|
||||
try {
|
||||
private_connector.stop();
|
||||
}
|
||||
catch (Exception e){
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
public CustomDataSubscriber(String broker_topic, String broker_ip, int broker_port, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation, String subscriber_key, String application_name) {
|
||||
|
||||
private_connector = new ExtendedConnector("slovid",
|
||||
new SimpleConnectorHandler(broker_ip, broker_topic, application_name),
|
||||
List.of()
|
||||
, List.of(),
|
||||
false,
|
||||
false,
|
||||
new StaticExnConfig(
|
||||
broker_ip,
|
||||
broker_port,
|
||||
brokerUsername,
|
||||
brokerPassword,
|
||||
60,
|
||||
EMPTY
|
||||
)
|
||||
);
|
||||
Logger.getGlobal().log(info_logging_level,"Starting private connector");
|
||||
private_connector.start();
|
||||
}
|
||||
|
||||
|
||||
public static void main(String[] args) {
|
||||
AtomicReference<CustomDataSubscriber> subscriber = new AtomicReference<>();
|
||||
AtomicReference<String> broker_ip = new AtomicReference<>();
|
||||
AtomicReference<String> broker_topic = new AtomicReference<>();
|
||||
AtomicReference<Integer> broker_port = new AtomicReference<>();
|
||||
AtomicReference<String> subscriber_key = new AtomicReference<>();
|
||||
AtomicReference<String> subscriber_application = new AtomicReference<>();
|
||||
|
||||
AtomicBoolean consumer_has_started = new AtomicBoolean(false);
|
||||
|
||||
broker_topic.set("eu.nebulouscloud.monitoring.slo.nef");
|
||||
broker_ip.set("localhost");
|
||||
subscriber_key.set("test_subscriber");
|
||||
subscriber_application.set("test_application");
|
||||
Thread t = new Thread(){
|
||||
@Override
|
||||
public void run() {
|
||||
if (subscriber.get()!=null){
|
||||
subscriber.get().stop_connector();
|
||||
}
|
||||
subscriber.set(new CustomDataSubscriber(broker_topic.toString(), broker_ip.toString(), 5672,"admin", "admin", EMPTY, subscriber_key.toString(), subscriber_application.toString()));
|
||||
CustomDataSubscriber.private_connector.start();
|
||||
}
|
||||
};
|
||||
//t.start();
|
||||
|
||||
|
||||
|
||||
/*
|
||||
class MyConnectorHandler extends ConnectorHandler{
|
||||
@Override
|
||||
public void onReady(Context context) {
|
||||
Logger.getGlobal().log(info_logging_level,"Starting the connector handler");
|
||||
}
|
||||
}
|
||||
|
||||
Connector private_connector = new Connector("slovid",
|
||||
new MyConnectorHandler(){
|
||||
|
||||
},List.of(),
|
||||
List.of(
|
||||
new Consumer("localhost", "eu.nebulouscloud.monitoring.slo.new", new Handler() {
|
||||
@Override
|
||||
public void onMessage(String key, String address, Map body, Message message, Context context) {
|
||||
super.onMessage(key, address, body, message, context);
|
||||
Logger.getGlobal().log(info_logging_level,"A message, "+body.toString()+" has arrived at address "+address);
|
||||
}
|
||||
},true,true)
|
||||
)
|
||||
,
|
||||
false,
|
||||
false,
|
||||
new StaticExnConfig(
|
||||
"localhost",
|
||||
5672,
|
||||
"admin",
|
||||
"admin",
|
||||
60,
|
||||
EMPTY
|
||||
)
|
||||
);
|
||||
Logger.getGlobal().log(info_logging_level,"Starting private connector");
|
||||
private_connector.start();
|
||||
*/
|
||||
|
||||
|
||||
|
||||
JFrame frame = new JFrame("Broker subscriber app");
|
||||
JTextField broker_ipTextField = new JTextField("localhost", 30);
|
||||
//JComboBox<String> TopicTextField = new JComboBox<>(new String[]{"eu.nebulouscloud.monitoring.slo.new","eu.nebulouscloud.monitoring.realtime.cpu_usage", "eu.nebulouscloud.monitoring.predicted.cpu_usage", "eu.nebulouscloud.monitoring.metric_list","eu.nebulouscloud.forecasting.start_forecasting.exponentialsmoothing",debug_data_trigger_topic_name});
|
||||
//TopicTextField.setEditable(true);
|
||||
JTextField TopicTextField = new JTextField("",30);
|
||||
JTextField ConsumerKeyTextField = new JTextField("slovid", 20);
|
||||
JTextField broker_portTextField = new JTextField("5672",20);
|
||||
JTextField ConsumerApplicationTextField = new JTextField(default_application_name, 20);
|
||||
|
||||
JTextArea subscriptionTextArea = new JTextArea(10, 30);
|
||||
subscriptionTextArea.setBackground(Color.LIGHT_GRAY);
|
||||
subscriptionTextArea.setForeground(Color.BLUE);
|
||||
subscriptionTextArea.setBorder(BorderFactory.createLineBorder(Color.DARK_GRAY, 2));
|
||||
subscriptionTextArea.setWrapStyleWord(true);
|
||||
|
||||
//Topic listing functionality, unavailable
|
||||
/*
|
||||
JTextArea topic_list_TextArea = new JTextArea(10, 30);
|
||||
subscriptionTextArea.setBackground(Color.LIGHT_GRAY);
|
||||
subscriptionTextArea.setForeground(Color.BLUE);
|
||||
subscriptionTextArea.setBorder(BorderFactory.createLineBorder(Color.DARK_GRAY, 2));
|
||||
subscriptionTextArea.setWrapStyleWord(true);
|
||||
*/
|
||||
|
||||
JButton subscribeButton = new JButton("Start subscription");
|
||||
//JButton get_topic_listButton = new JButton("Refresh topic list");
|
||||
|
||||
/*get_topic_listButton.addActionListener(e -> {
|
||||
private_connector.getHandler().ctx.
|
||||
});*/
|
||||
|
||||
subscribeButton.addActionListener(e -> {
|
||||
broker_ip.set(broker_ipTextField.getText());
|
||||
broker_topic.set(TopicTextField.getText());
|
||||
broker_port.set(Integer.parseInt(broker_portTextField.getText()));
|
||||
subscriber_key.set(ConsumerKeyTextField.getText());
|
||||
subscriber_application.set(ConsumerApplicationTextField.getText());
|
||||
//notify_create_subscriber.notify();
|
||||
|
||||
|
||||
if (consumer_has_started.get()) {
|
||||
Logger.getGlobal().log(info_logging_level,"Removing consumer");
|
||||
private_connector.remove_consumer_with_key(subscriber_key.get());
|
||||
Logger.getGlobal().log(info_logging_level,"Consumer should be stopped now");
|
||||
Consumer consumer;
|
||||
if (subscriber_application.get()!=null && !subscriber_application.get().isEmpty()) {
|
||||
Logger.getGlobal().log(info_logging_level, "The application for which the subscriber is created is the following " + subscriber_application.get());
|
||||
consumer = new Consumer(subscriber_key.get(), broker_topic.get(), new Handler() {
|
||||
@Override
|
||||
public void onMessage(String key, String address, Map body, Message message, Context context) {
|
||||
super.onMessage(key, address, body, message, context);
|
||||
try {
|
||||
Logger.getGlobal().log(info_logging_level, "A message, " + body.toString() + " has arrived at address " + address + " for application " + message.subject());
|
||||
} catch (ClientException ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
String old_text = subscriptionTextArea.getText() + "\n";
|
||||
subscriptionTextArea.setText(old_text + body);
|
||||
}
|
||||
}, subscriber_application.get(), true, true);
|
||||
}
|
||||
else{
|
||||
Logger.getGlobal().log(info_logging_level, "Subscribing for all applications");
|
||||
consumer = new Consumer(subscriber_key.get(), broker_topic.get(), new Handler() {
|
||||
@Override
|
||||
public void onMessage(String key, String address, Map body, Message message, Context context) {
|
||||
super.onMessage(key, address, body, message, context);
|
||||
try {
|
||||
Logger.getGlobal().log(info_logging_level, "A message, " + body.toString() + " has arrived at address " + address + " for application " + message.subject());
|
||||
} catch (ClientException ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
String old_text = subscriptionTextArea.getText() + "\n";
|
||||
subscriptionTextArea.setText(old_text + body);
|
||||
}
|
||||
}, true, true);
|
||||
}
|
||||
private_connector.add_consumer(consumer);
|
||||
}
|
||||
else {
|
||||
if (!subscriber_application.get().isEmpty()) {
|
||||
Logger.getGlobal().log(info_logging_level, "Starting the connector for the consumption of messages with " + subscriber_key + " at " + broker_topic.get() + " for application " + subscriber_application.get());
|
||||
consumer_has_started.set(true);
|
||||
private_connector = new ExtendedConnector("slovid",
|
||||
new CustomConnectorHandler() {
|
||||
private Consumer current_consumer = null;
|
||||
AtomicBoolean consumer_has_started = new AtomicBoolean(false);
|
||||
|
||||
@Override
|
||||
public void onReady(Context context) {
|
||||
super.onReady(context);
|
||||
Logger.getGlobal().log(info_logging_level, "On ready activated");
|
||||
if (current_consumer != null) {
|
||||
context.unregisterConsumer(current_consumer.key());
|
||||
Logger.getGlobal().log(info_logging_level, "Unregistering consumer");
|
||||
}
|
||||
Consumer consumer = new Consumer(subscriber_key.get(), broker_topic.get(), new Handler() {
|
||||
@Override
|
||||
public void onMessage(String key, String address, Map body, Message message, Context context) {
|
||||
super.onMessage(key, address, body, message, context);
|
||||
try {
|
||||
Logger.getGlobal().log(info_logging_level, "A message, " + body.toString() + " has arrived at address " + address + " for application " + message.subject());
|
||||
} catch (ClientException ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
String old_text = subscriptionTextArea.getText() + "\n";
|
||||
subscriptionTextArea.setText(old_text + body);
|
||||
}
|
||||
}, subscriber_application.get(), true, true);
|
||||
context.registerConsumer(consumer);
|
||||
}
|
||||
}, List.of(),
|
||||
List.of(
|
||||
)
|
||||
,
|
||||
false,
|
||||
false,
|
||||
new StaticExnConfig(
|
||||
broker_ip.get(),
|
||||
broker_port.get(),
|
||||
"admin",
|
||||
"admin",
|
||||
60,
|
||||
EMPTY
|
||||
)
|
||||
);
|
||||
}else {
|
||||
if (!subscriber_application.get().isEmpty()) {
|
||||
Logger.getGlobal().log(info_logging_level, "Starting the connector for the consumption of messages with " + subscriber_key + " at " + broker_topic.get() + " for application "+subscriber_application.get());
|
||||
consumer_has_started.set(true);
|
||||
private_connector = new ExtendedConnector("slovid",
|
||||
new CustomConnectorHandler() {
|
||||
private Consumer current_consumer = null;
|
||||
AtomicBoolean consumer_has_started = new AtomicBoolean(false);
|
||||
|
||||
@Override
|
||||
public void onReady(Context context) {
|
||||
super.onReady(context);
|
||||
Logger.getGlobal().log(info_logging_level, "On ready activated");
|
||||
if (current_consumer != null) {
|
||||
context.unregisterConsumer(current_consumer.key());
|
||||
Logger.getGlobal().log(info_logging_level, "Unregistering consumer");
|
||||
}
|
||||
Consumer consumer = new Consumer(subscriber_key.get(), broker_topic.get(), new Handler() {
|
||||
@Override
|
||||
public void onMessage(String key, String address, Map body, Message message, Context context) {
|
||||
super.onMessage(key, address, body, message, context);
|
||||
try {
|
||||
Logger.getGlobal().log(info_logging_level, "A message, " + body.toString() + " has arrived at address " + address + " for application " + message.subject());
|
||||
} catch (ClientException ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
String old_text = subscriptionTextArea.getText() + "\n";
|
||||
subscriptionTextArea.setText(old_text + body);
|
||||
}
|
||||
}, subscriber_application.get(), true, true);
|
||||
context.registerConsumer(consumer);
|
||||
}
|
||||
}, List.of(),
|
||||
List.of(
|
||||
)
|
||||
,
|
||||
false,
|
||||
false,
|
||||
new StaticExnConfig(
|
||||
broker_ip.get(),
|
||||
broker_port.get(),
|
||||
"admin",
|
||||
"admin",
|
||||
60,
|
||||
EMPTY
|
||||
)
|
||||
);
|
||||
}else{
|
||||
Logger.getGlobal().log(info_logging_level, "Starting the connector for the consumption of messages with " + subscriber_key + " at " + broker_topic.get() + " for all applications");
|
||||
consumer_has_started.set(true);
|
||||
private_connector = new ExtendedConnector("slovid",
|
||||
new CustomConnectorHandler() {
|
||||
private Consumer current_consumer = null;
|
||||
AtomicBoolean consumer_has_started = new AtomicBoolean(false);
|
||||
|
||||
@Override
|
||||
public void onReady(Context context) {
|
||||
super.onReady(context);
|
||||
Logger.getGlobal().log(info_logging_level, "On ready activated");
|
||||
if (current_consumer != null) {
|
||||
context.unregisterConsumer(current_consumer.key());
|
||||
Logger.getGlobal().log(info_logging_level, "Unregistering consumer");
|
||||
}
|
||||
Consumer consumer = new Consumer(subscriber_key.get(), broker_topic.get(), new Handler() {
|
||||
@Override
|
||||
public void onMessage(String key, String address, Map body, Message message, Context context) {
|
||||
super.onMessage(key, address, body, message, context);
|
||||
try {
|
||||
Logger.getGlobal().log(info_logging_level, "A message, " + body.toString() + " has arrived at address " + address + " for application " + message.subject());
|
||||
} catch (ClientException ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
String old_text = subscriptionTextArea.getText() + "\n";
|
||||
subscriptionTextArea.setText(old_text + body);
|
||||
}
|
||||
}, true, true);
|
||||
context.registerConsumer(consumer);
|
||||
}
|
||||
}, List.of(),
|
||||
List.of(
|
||||
)
|
||||
,
|
||||
false,
|
||||
false,
|
||||
new StaticExnConfig(
|
||||
broker_ip.get(),
|
||||
broker_port.get(),
|
||||
"admin",
|
||||
"admin",
|
||||
60,
|
||||
EMPTY
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
Logger.getGlobal().log(info_logging_level, "Starting private connector");
|
||||
private_connector.start();
|
||||
}
|
||||
});
|
||||
|
||||
JPanel broker_config_panel = new JPanel(new GridBagLayout());
|
||||
GridBagConstraints c = new GridBagConstraints();
|
||||
c.insets = new Insets(5, 5, 5, 5); // This will add 5 pixels of space on all sides of each component
|
||||
|
||||
int init_value_x=0;
|
||||
int init_value_y=0;
|
||||
|
||||
c.gridx = init_value_x;
|
||||
c.gridy = init_value_y;
|
||||
c.anchor = GridBagConstraints.LINE_END;
|
||||
broker_config_panel.add(new JLabel("Broker to subscribe to:"), c);
|
||||
|
||||
c.gridx = (++init_value_x)%2;
|
||||
c.gridy = (++init_value_y)/2;
|
||||
c.anchor = GridBagConstraints.LINE_START;
|
||||
broker_config_panel.add(broker_ipTextField, c);
|
||||
|
||||
c.gridx = (++init_value_x)%2;
|
||||
c.gridy = (++init_value_y)/2;
|
||||
c.anchor = GridBagConstraints.LINE_END;
|
||||
broker_config_panel.add(new JLabel("Topic to subscribe to:"), c);
|
||||
|
||||
c.gridx = (++init_value_x)%2;
|
||||
c.gridy = (++init_value_y)/2;
|
||||
c.anchor = GridBagConstraints.LINE_START;
|
||||
broker_config_panel.add(TopicTextField, c);
|
||||
|
||||
c.gridx = (++init_value_x)%2;
|
||||
c.gridy = (++init_value_y)/2;
|
||||
c.anchor = GridBagConstraints.LINE_END;
|
||||
broker_config_panel.add(new JLabel("Port to subscribe at:"), c);
|
||||
|
||||
c.gridx = (++init_value_x)%2;
|
||||
c.gridy = (++init_value_y)/2;
|
||||
c.anchor = GridBagConstraints.LINE_START;
|
||||
broker_config_panel.add(broker_portTextField, c);
|
||||
|
||||
c.gridx = (++init_value_x)%2;
|
||||
c.gridy = (++init_value_y)/2;
|
||||
c.anchor = GridBagConstraints.LINE_END;
|
||||
broker_config_panel.add(new JLabel("Key to subscribe with:"), c);
|
||||
|
||||
c.gridx = (++init_value_x)%2;
|
||||
c.gridy = (++init_value_y)/2;
|
||||
c.anchor = GridBagConstraints.LINE_START;
|
||||
broker_config_panel.add(ConsumerKeyTextField, c);
|
||||
|
||||
c.gridx = (++init_value_x)%2;
|
||||
c.gridy = (++init_value_y)/2;
|
||||
c.anchor = GridBagConstraints.LINE_END;
|
||||
broker_config_panel.add(new JLabel("Application to subscribe for:"), c);
|
||||
|
||||
c.gridx = (++init_value_x)%2;
|
||||
c.gridy = (++init_value_y)/2;
|
||||
c.anchor = GridBagConstraints.LINE_START;
|
||||
broker_config_panel.add(ConsumerApplicationTextField, c);
|
||||
|
||||
c.gridx = (++init_value_x)%2;
|
||||
c.gridy = (++init_value_y)/2;
|
||||
c.anchor = GridBagConstraints.LINE_END;
|
||||
broker_config_panel.add(new JLabel("Received text:"), c);
|
||||
|
||||
c.gridx = (++init_value_x)%2;
|
||||
c.gridy = (++init_value_y)/2;
|
||||
c.anchor = GridBagConstraints.LINE_START;
|
||||
c.gridwidth = 2;
|
||||
c.fill = GridBagConstraints.BOTH;
|
||||
c.weightx = 1.0;
|
||||
c.weighty = 1.0;
|
||||
broker_config_panel.add(new JScrollPane(subscriptionTextArea), c);
|
||||
/*
|
||||
c.gridx = (++init_value_x)%2;
|
||||
c.gridy = (++init_value_y)/2;
|
||||
c.anchor = GridBagConstraints.LINE_END;
|
||||
broker_config_panel.add(new JLabel("Topic list:"), c);
|
||||
|
||||
c.gridx = (++init_value_x)%2;
|
||||
c.gridy = (++init_value_y)/2;
|
||||
c.anchor = GridBagConstraints.LINE_START;
|
||||
c.gridwidth = 2;
|
||||
c.fill = GridBagConstraints.BOTH;
|
||||
c.weightx = 1.0;
|
||||
c.weighty = 1.0;
|
||||
broker_config_panel.add(new JScrollPane(topic_list_TextArea), c);
|
||||
|
||||
*/
|
||||
|
||||
|
||||
c.gridx = (++init_value_x)%2;
|
||||
c.gridy = (++init_value_y)/2;
|
||||
c.anchor = GridBagConstraints.LINE_END;
|
||||
c.gridwidth = 1;
|
||||
c.fill = GridBagConstraints.NONE;
|
||||
c.weightx = 0.0;
|
||||
c.weighty = 0.0;
|
||||
broker_config_panel.add(subscribeButton, c);
|
||||
|
||||
frame.add(broker_config_panel);
|
||||
frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
|
||||
frame.pack();
|
||||
frame.setVisible(true);
|
||||
|
||||
synchronized (new_message_arrived){
|
||||
while(!new_message_arrived.getValue()){
|
||||
try {
|
||||
new_message_arrived.wait();
|
||||
} catch (InterruptedException ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
new_message_arrived.setValue(false);
|
||||
String older_text = subscriptionTextArea.getText();
|
||||
subscriptionTextArea.setText(older_text+"\n"+String.valueOf(message_payload));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -1,46 +1,58 @@
|
||||
package utility_beans;
|
||||
package utility_beans.broker_communication;
|
||||
|
||||
import eu.nebulouscloud.exn.Connector;
|
||||
import eu.nebulouscloud.exn.core.Consumer;
|
||||
import eu.nebulouscloud.exn.core.Handler;
|
||||
import eu.nebulouscloud.exn.core.Context;
|
||||
import eu.nebulouscloud.exn.core.Publisher;
|
||||
import eu.nebulouscloud.exn.handlers.ConnectorHandler;
|
||||
import eu.nebulouscloud.exn.settings.ExnConfig;
|
||||
import org.apache.catalina.util.CustomObjectInputStream;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
public class ExtendedConnector extends Connector {
|
||||
private ConnectorHandler handler;
|
||||
private CustomConnectorHandler handler;
|
||||
|
||||
private Connector connector;
|
||||
public ExtendedConnector(String component, ConnectorHandler handler, List<Publisher> publishers, List<Consumer> consumers, boolean enableState, boolean enableHealth, ExnConfig configuration) {
|
||||
super(component, handler, publishers, consumers, enableState, enableHealth, configuration);
|
||||
this.handler = handler;
|
||||
this.handler =(CustomConnectorHandler) handler;
|
||||
}
|
||||
|
||||
public ExtendedConnector(String component, ConnectorHandler handler, List<Publisher> publishers, List<Consumer> consumers, boolean enableState, ExnConfig configuration) {
|
||||
super(component, handler, publishers, consumers, enableState, configuration);
|
||||
this.handler = handler;
|
||||
this.handler = (CustomConnectorHandler) handler;
|
||||
}
|
||||
|
||||
public ExtendedConnector(String component, ConnectorHandler handler, List<Publisher> publishers, List<Consumer> consumers, ExnConfig configuration) {
|
||||
super(component, handler, publishers, consumers, configuration);
|
||||
this.handler = handler;
|
||||
this.handler = (CustomConnectorHandler) handler;
|
||||
}
|
||||
|
||||
public ConnectorHandler getHandler() {
|
||||
return handler;
|
||||
public CustomConnectorHandler getHandler() {
|
||||
return (CustomConnectorHandler) handler;
|
||||
}
|
||||
|
||||
public void setHandler(ConnectorHandler handler) {
|
||||
public void setHandler(CustomConnectorHandler handler) {
|
||||
this.handler = handler;
|
||||
}
|
||||
|
||||
public void remove_consumer_with_key(String key) {
|
||||
try {
|
||||
((CustomConnectorHandler)handler).getContext().unregisterConsumer(key);
|
||||
Context context = ((CustomConnectorHandler)handler).getContext();
|
||||
context.unregisterConsumer(key);
|
||||
}catch (ClassCastException c){
|
||||
Logger.getAnonymousLogger().log(Level.WARNING,"Could not unregister consumer, as the handler of the Connector it belongs to is not a CustomConnectorHandler");
|
||||
}
|
||||
}
|
||||
|
||||
private void remove_publisher_with_key(String key) {
|
||||
try {
|
||||
Context context = ((CustomConnectorHandler)handler).getContext();
|
||||
context.unregisterPublisher(key);
|
||||
}catch (ClassCastException c){
|
||||
Logger.getAnonymousLogger().log(Level.WARNING,"Could not unregister consumer, as the handler of the Connector it belongs to is not a CustomConnectorHandler");
|
||||
}
|
||||
@ -55,6 +67,25 @@ public class ExtendedConnector extends Connector {
|
||||
}
|
||||
}
|
||||
|
||||
public void stop(ArrayList<Consumer> consumers, ArrayList <Publisher> publishers){
|
||||
if (consumers.size()>0) {
|
||||
stop_consumers(consumers);
|
||||
}
|
||||
if (publishers.size()>0) {
|
||||
stop_publishers(publishers);
|
||||
}
|
||||
}
|
||||
public void stop_consumers(ArrayList<Consumer> consumers){
|
||||
for (Consumer consumer : consumers){
|
||||
remove_consumer_with_key(consumer.key());
|
||||
}
|
||||
}
|
||||
public void stop_publishers(ArrayList<Publisher> publishers){
|
||||
for (Publisher publisher : publishers){
|
||||
remove_publisher_with_key(publisher.key());
|
||||
}
|
||||
}
|
||||
|
||||
public Connector getConnector() {
|
||||
return connector;
|
||||
}
|
||||
@ -62,4 +93,5 @@ public class ExtendedConnector extends Connector {
|
||||
public void setConnector(Connector connector) {
|
||||
this.connector = connector;
|
||||
}
|
||||
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
package utility_beans;
|
||||
package utility_beans.generic_component_functionality;
|
||||
|
||||
import slo_violation_detector_engine.detector.DetectorSubcomponent;
|
||||
import slo_violation_detector_engine.director.DirectorSubcomponent;
|
||||
@ -8,7 +8,7 @@ import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import static configuration.Constants.NAME_SEPARATOR;
|
||||
import static utility_beans.CharacterizedThread.CharacterizedThreadType.*;
|
||||
import static utility_beans.generic_component_functionality.CharacterizedThread.CharacterizedThreadType.*;
|
||||
|
||||
public class CharacterizedThread{
|
||||
public enum CharacterizedThreadType{
|
@ -0,0 +1,5 @@
|
||||
package utility_beans.generic_component_functionality;
|
||||
|
||||
public enum OperationalMode {
|
||||
DIRECTOR,DETECTOR
|
||||
}
|
@ -6,7 +6,7 @@
|
||||
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
*/
|
||||
|
||||
package utility_beans;
|
||||
package utility_beans.monitoring;
|
||||
|
||||
|
||||
|
@ -6,7 +6,7 @@
|
||||
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
*/
|
||||
|
||||
package utility_beans;
|
||||
package utility_beans.monitoring;
|
||||
|
||||
import slo_violation_detector_engine.detector.DetectorSubcomponent;
|
||||
|
@ -6,7 +6,7 @@
|
||||
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
*/
|
||||
|
||||
package utility_beans;
|
||||
package utility_beans.monitoring;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
@ -6,7 +6,7 @@
|
||||
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
*/
|
||||
|
||||
package utility_beans;
|
||||
package utility_beans.monitoring;
|
||||
|
||||
import org.apache.commons.collections4.queue.CircularFifoQueue;
|
||||
import slo_violation_detector_engine.detector.DetectorSubcomponent;
|
||||
@ -14,12 +14,10 @@ import utilities.MathUtils;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import static configuration.Constants.*;
|
||||
import static java.lang.Integer.valueOf;
|
||||
import static utility_beans.PredictedMonitoringAttribute.*;
|
||||
import static utility_beans.monitoring.PredictedMonitoringAttribute.*;
|
||||
|
||||
public class RealtimeMonitoringAttribute {
|
||||
|
@ -6,7 +6,7 @@
|
||||
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
*/
|
||||
|
||||
package utility_beans;
|
||||
package utility_beans.synchronization;
|
||||
|
||||
public class SynchronizedBoolean {
|
||||
private Boolean value;
|
@ -6,11 +6,9 @@
|
||||
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
*/
|
||||
|
||||
package utility_beans;
|
||||
package utility_beans.synchronization;
|
||||
|
||||
|
||||
import utility_beans.SynchronizedBoolean;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
@ -1,4 +1,4 @@
|
||||
package utility_beans;
|
||||
package utility_beans.synchronization;
|
||||
|
||||
public class SynchronizedInteger {
|
||||
private Integer value;
|
@ -6,7 +6,7 @@
|
||||
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
*/
|
||||
|
||||
package utility_beans;
|
||||
package utility_beans.synchronization;
|
||||
|
||||
|
||||
import java.util.Collections;
|
@ -1 +1,4 @@
|
||||
spring.profiles.active=production
|
||||
spring.profiles.active=production
|
||||
spring.mvc.view.prefix=/src/main/webapp/WEB-INF/jsp/
|
||||
spring.mvc.view.suffix=.jsp
|
||||
logging.level.org.springframework.web.servlet.view.InternalResourceViewResolver=DEBUG
|
@ -5,7 +5,8 @@ metrics_bounds = avgResponseTime;unbounded;unbounded,custom2;0;3
|
||||
slo_rules_topic = eu.nebulouscloud.monitoring.slo.new
|
||||
single_slo_rule_active = true
|
||||
#broker_ip_url = tcp://localhost:61616?wireFormat.maxInactivityDuration=0
|
||||
broker_ip_url = localhost
|
||||
broker_ip_url = nebulous-activemq
|
||||
broker_port = 5672
|
||||
broker_username = admin
|
||||
broker_password = admin
|
||||
|
||||
|
@ -8,8 +8,8 @@
|
||||
|
||||
//import eu.melodic.event.brokerclient.BrokerPublisher;
|
||||
//import eu.melodic.event.brokerclient.BrokerSubscriber;
|
||||
import utility_beans.BrokerPublisher;
|
||||
import utility_beans.BrokerSubscriber;
|
||||
import utility_beans.broker_communication.BrokerPublisher;
|
||||
import utility_beans.broker_communication.BrokerSubscriber;
|
||||
import org.json.simple.JSONObject;
|
||||
import org.json.simple.parser.JSONParser;
|
||||
import org.json.simple.parser.ParseException;
|
||||
@ -20,6 +20,7 @@ import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
import java.util.Collections;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.BiFunction;
|
||||
@ -44,9 +45,9 @@ public class ConnectivityTests {
|
||||
|
||||
prop.load(inputStream);
|
||||
|
||||
BrokerPublisher publisher = new BrokerPublisher("test_topic",prop.getProperty("broker_ip_url"),prop.getProperty("broker_username"),prop.getProperty("broker_password"), amq_library_configuration_location);
|
||||
BrokerPublisher publisher = new BrokerPublisher("test_topic",prop.getProperty("broker_ip_url"), Integer.parseInt(prop.getProperty("broker_port")),prop.getProperty("broker_username"),prop.getProperty("broker_password"), amq_library_configuration_location);
|
||||
|
||||
BrokerSubscriber subscriber = new BrokerSubscriber("test_topic",prop.getProperty("broker_ip_url"),prop.getProperty("broker_username"),prop.getProperty("broker_password"),amq_library_configuration_location,default_application_name);
|
||||
BrokerSubscriber subscriber = new BrokerSubscriber("test_topic",prop.getProperty("broker_ip_url"), Integer.parseInt(prop.getProperty("broker_port")),prop.getProperty("broker_username"),prop.getProperty("broker_password"),amq_library_configuration_location,default_application_name);
|
||||
|
||||
JSONObject object_to_publish = new JSONObject();
|
||||
object_to_publish.put("ram","95");
|
||||
@ -70,12 +71,12 @@ public class ConnectivityTests {
|
||||
};
|
||||
|
||||
Thread subscription_thread = new Thread(() -> {
|
||||
subscriber.subscribe(slo_function,new AtomicBoolean(false)); //will be a short-lived test, so setting stop signal to false
|
||||
subscriber.subscribe(slo_function,default_application_name,new AtomicBoolean(false)); //will be a short-lived test, so setting stop signal to false
|
||||
});
|
||||
subscription_thread.start();
|
||||
//slo_bound_running_threads.put("Test topic subscription thread",subscription_thread);
|
||||
|
||||
publisher.publish(object_to_publish.toJSONString());
|
||||
publisher.publish(object_to_publish.toJSONString(), Collections.singleton(default_application_name));
|
||||
try {
|
||||
Thread.sleep(2000);
|
||||
}catch (InterruptedException i){
|
||||
|
@ -8,17 +8,17 @@
|
||||
|
||||
import org.junit.Test;
|
||||
import slo_violation_detector_engine.detector.DetectorSubcomponent;
|
||||
import utility_beans.CharacterizedThread;
|
||||
import utility_beans.MonitoringAttributeStatistics;
|
||||
import utility_beans.PredictedMonitoringAttribute;
|
||||
import utility_beans.generic_component_functionality.CharacterizedThread;
|
||||
import utility_beans.monitoring.MonitoringAttributeStatistics;
|
||||
import utility_beans.monitoring.PredictedMonitoringAttribute;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import static configuration.Constants.default_application_name;
|
||||
import static configuration.Constants.roc_limit;
|
||||
import static utility_beans.PredictedMonitoringAttribute.getAttributes_maximum_rate_of_change;
|
||||
import static utility_beans.PredictedMonitoringAttribute.getAttributes_minimum_rate_of_change;
|
||||
import static utility_beans.RealtimeMonitoringAttribute.simple_initialize_0_100_bounded_attributes;
|
||||
import static utility_beans.monitoring.PredictedMonitoringAttribute.getAttributes_maximum_rate_of_change;
|
||||
import static utility_beans.monitoring.PredictedMonitoringAttribute.getAttributes_minimum_rate_of_change;
|
||||
import static utility_beans.monitoring.RealtimeMonitoringAttribute.simple_initialize_0_100_bounded_attributes;
|
||||
|
||||
public class DerivedMonitoringAttributeTests {
|
||||
|
||||
|
@ -10,10 +10,10 @@ import org.junit.Test;
|
||||
import slo_rule_modelling.SLOSubRule;
|
||||
import slo_violation_detector_engine.detector.DetectorSubcomponent;
|
||||
import utilities.SLOViolationCalculator;
|
||||
import utility_beans.CharacterizedThread;
|
||||
import utility_beans.MonitoringAttributeStatistics;
|
||||
import utility_beans.RealtimeMonitoringAttribute;
|
||||
import utility_beans.PredictedMonitoringAttribute;
|
||||
import utility_beans.generic_component_functionality.CharacterizedThread;
|
||||
import utility_beans.monitoring.MonitoringAttributeStatistics;
|
||||
import utility_beans.monitoring.RealtimeMonitoringAttribute;
|
||||
import utility_beans.monitoring.PredictedMonitoringAttribute;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
||||
|
@ -12,8 +12,9 @@
|
||||
//import eu.melodic.event.brokerclient.templates.TopicNames;
|
||||
|
||||
import slo_violation_detector_engine.detector.DetectorSubcomponent;
|
||||
import utility_beans.*;
|
||||
import utility_beans.BrokerSubscriber.*;
|
||||
import utility_beans.broker_communication.BrokerPublisher;
|
||||
import utility_beans.broker_communication.BrokerSubscriber;
|
||||
import utility_beans.broker_communication.BrokerSubscriber.*;
|
||||
import org.json.simple.JSONArray;
|
||||
import org.json.simple.JSONObject;
|
||||
import org.json.simple.parser.JSONParser;
|
||||
@ -23,16 +24,16 @@ import org.junit.Test;
|
||||
import slo_rule_modelling.SLORule;
|
||||
import slo_rule_modelling.SLOSubRule;
|
||||
import utilities.MonitoringAttributeUtilities;
|
||||
import utility_beans.broker_communication.BrokerSubscriptionDetails;
|
||||
import utility_beans.monitoring.PredictedMonitoringAttribute;
|
||||
import utility_beans.synchronization.SynchronizedBoolean;
|
||||
|
||||
import java.io.*;
|
||||
|
||||
import java.net.URI;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Properties;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.BiFunction;
|
||||
@ -42,8 +43,8 @@ import java.util.logging.Logger;
|
||||
import static configuration.Constants.*;
|
||||
import static slo_rule_modelling.SLORule.process_rule_value;
|
||||
import static slo_violation_detector_engine.detector.DetectorSubcomponentUtilities.initialize_subrule_and_attribute_associations;
|
||||
import static utility_beans.CharacterizedThread.CharacterizedThreadRunMode.detached;
|
||||
import static utility_beans.PredictedMonitoringAttribute.getPredicted_monitoring_attributes;
|
||||
import static utility_beans.generic_component_functionality.CharacterizedThread.CharacterizedThreadRunMode.detached;
|
||||
import static utility_beans.monitoring.PredictedMonitoringAttribute.getPredicted_monitoring_attributes;
|
||||
|
||||
class MetricConfiguration{
|
||||
public String name;
|
||||
@ -92,6 +93,7 @@ public class UnboundedMonitoringAttributeTests {
|
||||
prop.load(inputStream);
|
||||
|
||||
String broker_ip_address = prop.getProperty("broker_ip_address");
|
||||
int broker_port = Integer.parseInt(prop.getProperty("broker_port"));
|
||||
String broker_username = prop.getProperty("broker_username");
|
||||
String broker_password = prop.getProperty("broker_password");
|
||||
|
||||
@ -114,9 +116,9 @@ public class UnboundedMonitoringAttributeTests {
|
||||
for (String metric_name : metric_names) {
|
||||
MonitoringAttributeUtilities.initialize_values(metric_name, detector.getSubcomponent_state());
|
||||
|
||||
String realtime_metric_topic_name = TopicNames.realtime_metric_values_topic(metric_name);
|
||||
String realtime_metric_topic_name = topic_prefix_realtime_metrics+metric_name;
|
||||
Logger.getGlobal().log(Level.INFO, "Starting realtime subscription at " + realtime_metric_topic_name);
|
||||
BrokerSubscriber subscriber = new BrokerSubscriber(realtime_metric_topic_name, broker_ip_address, broker_username, broker_password, amq_library_configuration_location,default_application_name);
|
||||
BrokerSubscriber subscriber = new BrokerSubscriber(realtime_metric_topic_name, broker_ip_address, broker_port,broker_username, broker_password, amq_library_configuration_location,default_application_name);
|
||||
BiFunction<String, String, String> function = (topic, message) -> {
|
||||
synchronized (detector.getSubcomponent_state().getMonitoring_attributes().get(topic)) {
|
||||
try {
|
||||
@ -130,25 +132,25 @@ public class UnboundedMonitoringAttributeTests {
|
||||
return message;
|
||||
};
|
||||
Thread realtime_subscription_thread = new Thread(() -> {
|
||||
subscriber.subscribe(function,new AtomicBoolean(false)); //will be a short-lived test, so setting stop signal to false
|
||||
subscriber.subscribe(function,default_application_name,new AtomicBoolean(false)); //will be a short-lived test, so setting stop signal to false
|
||||
// Insert some method call here.
|
||||
});
|
||||
realtime_subscription_thread.start();
|
||||
running.add(realtime_subscription_thread);
|
||||
|
||||
|
||||
String forecasted_metric_topic_name = TopicNames.final_metric_predictions_topic(metric_name);
|
||||
BrokerSubscriber forecasted_subscriber = new BrokerSubscriber(forecasted_metric_topic_name, broker_ip_address,broker_username,broker_password, amq_library_configuration_location,default_application_name);
|
||||
BiFunction<BrokerSubscriptionDetails,String,String> forecasted_function = (broker_details,message) ->{
|
||||
String forecasted_metric_topic_name = topic_prefix_final_predicted_metrics+metric_name;
|
||||
BrokerSubscriber forecasted_subscriber = new BrokerSubscriber(forecasted_metric_topic_name, broker_ip_address,broker_port,broker_username,broker_password, amq_library_configuration_location,default_application_name);
|
||||
BiFunction<BrokerSubscriptionDetails,String,String> forecasted_function = (broker_details, message) ->{
|
||||
HashMap<Integer, HashMap<Long, PredictedMonitoringAttribute>> predicted_attributes = getPredicted_monitoring_attributes();
|
||||
try {
|
||||
double forecasted_value = ((Number)((JSONObject)new JSONParser().parse(message)).get(EventFields.PredictionMetricEventFields.metricValue.name())).doubleValue();
|
||||
double probability_confidence = 100*((Number)((JSONObject)new JSONParser().parse(message)).get(EventFields.PredictionMetricEventFields.probability)).doubleValue();
|
||||
double forecasted_value = ((Number)((JSONObject)new JSONParser().parse(message)).get("metricValue")).doubleValue();
|
||||
double probability_confidence = 100*((Number)((JSONObject)new JSONParser().parse(message)).get("probability")).doubleValue();
|
||||
//double confidence_interval = ((Number)((JSONObject)new JSONParser().parse(message)).get(EventFields.PredictionMetricEventFields.confidence_interval)).doubleValue();
|
||||
JSONArray json_array_confidence_interval = ((JSONArray)((JSONObject)new JSONParser().parse(message)).get(EventFields.PredictionMetricEventFields.confidence_interval));
|
||||
JSONArray json_array_confidence_interval = ((JSONArray)((JSONObject)new JSONParser().parse(message)).get("confidence_interval"));
|
||||
double confidence_interval = ((Number)json_array_confidence_interval.get(1)).doubleValue() - ((Number)json_array_confidence_interval.get(0)).doubleValue();
|
||||
long timestamp = ((Number)((JSONObject)new JSONParser().parse(message)).get(EventFields.PredictionMetricEventFields.timestamp)).longValue();
|
||||
long targeted_prediction_time = ((Number)((JSONObject)new JSONParser().parse(message)).get(EventFields.PredictionMetricEventFields.predictionTime.name())).longValue();
|
||||
long timestamp = ((Number)((JSONObject)new JSONParser().parse(message)).get("timestamp")).longValue();
|
||||
long targeted_prediction_time = ((Number)((JSONObject)new JSONParser().parse(message)).get("predictionTime")).longValue();
|
||||
Logger.getGlobal().log(info_logging_level,"RECEIVED message with predicted value for "+ metric_name +" equal to "+ forecasted_value);
|
||||
|
||||
synchronized (detector.can_modify_slo_rules) {
|
||||
@ -192,7 +194,7 @@ public class UnboundedMonitoringAttributeTests {
|
||||
Thread forecasted_subscription_thread = new Thread(() -> {
|
||||
synchronized (detector.HAS_MESSAGE_ARRIVED.get_synchronized_boolean(forecasted_metric_topic_name)) {
|
||||
//if (Main.HAS_MESSAGE_ARRIVED.get_synchronized_boolean(forecasted_metric_topic_name).getValue())
|
||||
forecasted_subscriber.subscribe(forecasted_function,new AtomicBoolean(false)); //will be a short-lived test, so setting stop signal to false
|
||||
forecasted_subscriber.subscribe(forecasted_function,default_application_name,new AtomicBoolean(false)); //will be a short-lived test, so setting stop signal to false
|
||||
}
|
||||
});
|
||||
running.add(forecasted_subscription_thread);
|
||||
@ -243,8 +245,8 @@ public class UnboundedMonitoringAttributeTests {
|
||||
}
|
||||
|
||||
private static void perpetual_metric_publisher(String metric_name, double base_metric_value, double forecasted_metric_value, double confidence_interval, double probability, double metric_max_value, int publish_interval_in_milliseconds) {
|
||||
BrokerPublisher realtime_data_publisher = new BrokerPublisher(metric_name, "tcp://localhost:61616", "admin", "admin","src/main/resources/config/eu.melodic.event.brokerclient.properties");
|
||||
BrokerPublisher forecasted_data_publisher = new BrokerPublisher("prediction."+metric_name, "tcp://localhost:61616", "admin", "admin","src/main/resources/config/eu.melodic.event.brokerclient.properties");
|
||||
BrokerPublisher realtime_data_publisher = new BrokerPublisher(metric_name, "tcp://localhost:61616", 5672,"admin", "admin","src/main/resources/config/eu.melodic.event.brokerclient.properties");
|
||||
BrokerPublisher forecasted_data_publisher = new BrokerPublisher("prediction."+metric_name, "tcp://localhost:61616", 5672,"admin", "admin","src/main/resources/config/eu.melodic.event.brokerclient.properties");
|
||||
|
||||
while (true) {
|
||||
try {
|
||||
@ -253,19 +255,19 @@ public class UnboundedMonitoringAttributeTests {
|
||||
double random_value = ThreadLocalRandom.current().nextDouble();
|
||||
realtime_metric_json_object.put("metricValue", base_metric_value+random_value*(metric_max_value-base_metric_value));
|
||||
realtime_metric_json_object.put("timestamp",System.currentTimeMillis());
|
||||
realtime_data_publisher.publish(realtime_metric_json_object.toJSONString());
|
||||
realtime_data_publisher.publish(realtime_metric_json_object.toJSONString(), Collections.singleton(default_application_name));
|
||||
|
||||
JSONObject forecasted_metric_json_object = new JSONObject();
|
||||
forecasted_metric_json_object.put("metricValue", forecasted_metric_value);
|
||||
forecasted_metric_json_object.put("timestamp",System.currentTimeMillis());
|
||||
forecasted_metric_json_object.put("probability",probability);
|
||||
forecasted_metric_json_object.put(EventFields.PredictionMetricEventFields.predictionTime.name(),targeted_prediction_time);
|
||||
forecasted_metric_json_object.put("predictionTime",targeted_prediction_time);
|
||||
//((System.currentTimeMillis()/1000)%60)*60000+1); //The prediction supposedly reflects the metric values at the next minute
|
||||
JSONArray confidence_interval_list = new JSONArray();
|
||||
confidence_interval_list.add((forecasted_metric_value-confidence_interval/2));
|
||||
confidence_interval_list.add((forecasted_metric_value+confidence_interval/2));
|
||||
forecasted_metric_json_object.put("confidence_interval",confidence_interval_list);
|
||||
forecasted_data_publisher.publish(forecasted_metric_json_object.toJSONString());
|
||||
forecasted_data_publisher.publish(forecasted_metric_json_object.toJSONString(), Collections.singleton(default_application_name));
|
||||
Thread.sleep(publish_interval_in_milliseconds);
|
||||
|
||||
}catch (InterruptedException i){
|
||||
|
Loading…
x
Reference in New Issue
Block a user