From 72cfafcaf8fd108ffe627ae0e9f2eccd820018d4 Mon Sep 17 00:00:00 2001
From: ipatini <ipatini@mail.ntua.gr>
Date: Wed, 15 May 2024 17:59:06 +0300
Subject: [PATCH] RD: Bug fix in SAL registration service

Change-Id: I2e821466b2b29dd38d94916c269082abb54f978b
---
 .../broker_communication/BrokerPublisher.java | 13 +++++++++---
 .../SynchronousBrokerPublisher.java           | 13 +++++++++---
 .../discovery/monitor/DeviceProcessor.java    | 21 ++++++++++++++++++-
 .../service/SALRegistrationService.java       | 15 +++++++++++++
 4 files changed, 55 insertions(+), 7 deletions(-)

diff --git a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/BrokerPublisher.java b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/BrokerPublisher.java
index 03e8665..852167c 100644
--- a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/BrokerPublisher.java
+++ b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/BrokerPublisher.java
@@ -26,9 +26,13 @@ public class BrokerPublisher {
     private int broker_port;
 
     public BrokerPublisher(String topic, String broker_ip, int broker_port, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation) {
+        this(topic,broker_ip,broker_port,brokerUsername,brokerPassword,amqLibraryConfigurationLocation,false);
+    }
+    public BrokerPublisher(String topic, String broker_ip, int broker_port, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation, boolean hard_initialize_connector) {
         boolean able_to_initialize_BrokerPublisher = topic!=null && broker_ip!=null && brokerUsername!=null && brokerPassword!=null && !topic.equals(EMPTY) && !broker_ip.equals(EMPTY) && !brokerUsername.equals(EMPTY) && !brokerPassword.equals(EMPTY);
 
         if (!able_to_initialize_BrokerPublisher){
+            log.error("Could not initialize BrokerPublisher");
             return;
         }
         boolean publisher_configuration_changed;
@@ -48,7 +52,7 @@ public class BrokerPublisher {
         }
 
 
-        if (publisher_configuration_changed){
+        if (publisher_configuration_changed || hard_initialize_connector){
 //            for (String current_broker_ip : broker_and_topics_to_publish_to.keySet()){
             log.info("Publisher configuration changed, creating new connector at  "+broker_ip+" for topic "+topic);
             if (active_connector!=null) {
@@ -99,12 +103,15 @@ public class BrokerPublisher {
             } catch (ParseException p) {
                 log.warn( "Could not parse the string content to be published to the broker as json, which is the following: "+json_string_content);
             }
-            if (private_publisher_instance != null) {
+            if (!is_publisher_null()) {
                 private_publisher_instance.send(json_object);
                 log.info("Sent new message\n"+json_object.toJSONString());
             } else {
-                log.error( "Could not send message to AMQP broker, as the broker ip to be used has not been specified");
+                log.error( "Could not send message to AMQP broker, as the publisher instance is null");
             }
         }
     }
+    public boolean is_publisher_null(){
+        return (private_publisher_instance == null);
+    }
 }
diff --git a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/SynchronousBrokerPublisher.java b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/SynchronousBrokerPublisher.java
index 5cdf4b9..3db8fa6 100644
--- a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/SynchronousBrokerPublisher.java
+++ b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/SynchronousBrokerPublisher.java
@@ -22,12 +22,16 @@ public class SynchronousBrokerPublisher {
     private ExtendedConnector active_connector;
     private String topic;
     private String broker_ip;
-
     public SynchronousBrokerPublisher(String topic, String broker_ip, int broker_port, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation) {
+        this(topic, broker_ip, broker_port, brokerUsername, brokerPassword, amqLibraryConfigurationLocation,false);
+    }
+
+    public SynchronousBrokerPublisher(String topic, String broker_ip, int broker_port, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation,boolean hard_initialize_connector) {
 
         boolean able_to_initialize_BrokerPublisher = topic!=null && broker_ip!=null && brokerUsername!=null && brokerPassword!=null && !topic.equals(EMPTY) && !broker_ip.equals(EMPTY) && !brokerUsername.equals(EMPTY) && !brokerPassword.equals(EMPTY);
 
         if (!able_to_initialize_BrokerPublisher){
+            log.error("Unable to initialize SynchronousBrokerPublisher");
             return;
         }
         boolean publisher_configuration_changed;
@@ -49,7 +53,7 @@ public class SynchronousBrokerPublisher {
         }
 
         //log.error("preliminary_outside");
-        if (publisher_configuration_changed){
+        if (publisher_configuration_changed || hard_initialize_connector){
             //log.error("preliminary_inside1");
 //            for (String current_broker_ip : broker_and_topics_to_publish_to.keySet()){
             log.info("Publisher configuration changed, creating new connector at  "+broker_ip+" for topic "+topic);
@@ -71,7 +75,7 @@ public class SynchronousBrokerPublisher {
             }
             //CustomConnectorHandler custom_handler = new CustomConnectorHandler();
 
-            active_connector = new ExtendedConnector("resource_manager"
+            active_connector = new ExtendedConnector("resource_manager_synchronous"
                     , new CustomConnectorHandler() {}
                     , publishers
                     , List.of(),
@@ -146,4 +150,7 @@ public class SynchronousBrokerPublisher {
         }
         return reply;
     }
+    public boolean is_publisher_null(){
+        return (private_publisher_instance == null);
+    }
 }
diff --git a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/monitor/DeviceProcessor.java b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/monitor/DeviceProcessor.java
index 8fe87bb..4b8d893 100644
--- a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/monitor/DeviceProcessor.java
+++ b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/monitor/DeviceProcessor.java
@@ -138,7 +138,26 @@ public class DeviceProcessor  implements InitializingBean {
                 lost_device_message.put("device_name",device.getName());
                 Clock clock = Clock.systemUTC();
                 lost_device_message.put("timestamp",(int)(clock.millis()/1000));
-                BrokerPublisher device_lost_publisher = new BrokerPublisher(processorProperties.getLost_device_topic(), processorProperties.getNebulous_broker_ip_address(), processorProperties.getNebulous_broker_port(),processorProperties.getNebulous_broker_username(), processorProperties.getNebulous_broker_password(), "");
+                log.info("Creating new BrokerPublisher to publish device lost message");
+                BrokerPublisher device_lost_publisher = new BrokerPublisher(processorProperties.getLost_device_topic(), processorProperties.getNebulous_broker_ip_address(), processorProperties.getNebulous_broker_port(), processorProperties.getNebulous_broker_username(), processorProperties.getNebulous_broker_password(), "");
+                int sending_attempt = 1;
+                while (device_lost_publisher.is_publisher_null()){
+
+                    try {
+                        log.info("Attempting to recreate new BrokerPublisher to publish the device lost message");
+                        log.info("The topic name is "+processorProperties.getLost_device_topic()+", the broker ip is "+ processorProperties.getNebulous_broker_ip_address()+", the broker port is "+ processorProperties.getNebulous_broker_port()+", the username is "+ processorProperties.getNebulous_broker_username()+", and the password is "+ processorProperties.getNebulous_broker_password());
+                        if (sending_attempt<=2) {
+                            device_lost_publisher = new BrokerPublisher(processorProperties.getLost_device_topic(), processorProperties.getNebulous_broker_ip_address(), processorProperties.getNebulous_broker_port(), processorProperties.getNebulous_broker_username(), processorProperties.getNebulous_broker_password(), "");
+                        }else{
+                            log.warn("Will now attempt to reset the BrokerPublisher connector");
+                            device_lost_publisher = new BrokerPublisher(processorProperties.getLost_device_topic(), processorProperties.getNebulous_broker_ip_address(), processorProperties.getNebulous_broker_port(), processorProperties.getNebulous_broker_username(), processorProperties.getNebulous_broker_password(), "",true);
+                        }
+                        Thread.sleep(3000);
+                    }catch (InterruptedException i){
+                        i.printStackTrace();
+                    }
+                    sending_attempt++;
+                }
                 device_lost_publisher.publish(lost_device_message.toJSONString(), Collections.singleton(""));
                 log.warn("processFailedDevices: Marked as FAILED device with Id: {}", device.getId());
             }
diff --git a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/registration/service/SALRegistrationService.java b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/registration/service/SALRegistrationService.java
index 607cb1f..153ac91 100644
--- a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/registration/service/SALRegistrationService.java
+++ b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/registration/service/SALRegistrationService.java
@@ -110,6 +110,21 @@ public class SALRegistrationService implements InitializingBean {
         //ArrayList<String> applications = get_running_applications(sal_running_applications_reply);
         //for (String application_name:applications) {
         SynchronousBrokerPublisher register_device_publisher = new SynchronousBrokerPublisher(get_registration_topic_name(application_name), processorProperties.getNebulous_broker_ip_address(),processorProperties.getNebulous_broker_port(), processorProperties.getNebulous_broker_username(), processorProperties.getNebulous_broker_password(), "");
+        int sending_attempt = 1;
+        while (register_device_publisher.is_publisher_null()){
+            if (sending_attempt<=2) {
+                register_device_publisher = new SynchronousBrokerPublisher(get_registration_topic_name(application_name), processorProperties.getNebulous_broker_ip_address(), processorProperties.getNebulous_broker_port(), processorProperties.getNebulous_broker_username(), processorProperties.getNebulous_broker_password(), "");
+            }else{
+                log.warn("Will now attempt to reset the Synchronous publisher connector");
+                register_device_publisher = new SynchronousBrokerPublisher(get_registration_topic_name(application_name), processorProperties.getNebulous_broker_ip_address(), processorProperties.getNebulous_broker_port(), processorProperties.getNebulous_broker_username(), processorProperties.getNebulous_broker_password(), "");
+            }
+            try {
+                Thread.sleep(3000);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+            sending_attempt++;
+        }
         //TODO handle the response here
         Map response = register_device_publisher.publish_for_response(register_device_message_string, Collections.singleton(application_name));
         log.info("The response received while trying to register device " + device_name + " is "+response.toString());