From a7bad9a798a9d6398d6f65f3dff707e523044221 Mon Sep 17 00:00:00 2001
From: ipatini <ipatini@mail.ntua.gr>
Date: Tue, 2 Apr 2024 20:41:39 +0300
Subject: [PATCH] Resource Manager: Copied changes from branch
 'add-sal-connectivity'

Change-Id: I9b39e95b117331e0b5d764938b7701c183986fba
---
 .../.mvn/wrapper/maven-wrapper.properties     |   2 +-
 resource-discovery/Dockerfile                 |  13 +-
 resource-discovery/pom.xml                    |  81 +++-
 resource-discovery/run.sh                     |   2 +-
 .../ResourceDiscoveryProperties.java          |   7 +
 .../broker_communication/BrokerPublisher.java | 110 ++++++
 .../BrokerSubscriber.java                     | 194 ++++++++++
 .../BrokerSubscriptionDetails.java            |  79 ++++
 .../CustomConnectorHandler.java               |  24 ++
 .../ExtendedConnector.java                    |  99 +++++
 .../broker_communication/JsonFileParser.java  |  27 ++
 .../broker_communication/SALCommunicator.java | 355 ++++++++++++++++++
 .../SynchronousBrokerPublisher.java           | 149 ++++++++
 .../discovery/monitor/DeviceProcessor.java    |  10 +
 .../DeviceManagementController.java           |   4 +
 .../discovery/monitor/model/Device.java       |   2 +
 .../service/DeviceManagementService.java      |   2 +
 .../UnknownDeviceRegistrationService.java     |  14 +-
 .../RegistrationRequestProcessor.java         |   6 +-
 .../discovery/registration/model/Device.java  |   1 +
 .../model/RegistrationRequest.java            |   7 +-
 .../service/SALRegistrationService.java       | 117 ++++++
 .../src/main/resources/application.yml        |   7 +
 .../archived-device-view.html                 |   7 +
 .../archived-request-view.html                |   7 +
 .../freebees_webdesign_6/device-view.html     |   7 +
 .../freebees_webdesign_6/request-edit.html    |   7 +
 27 files changed, 1317 insertions(+), 23 deletions(-)
 create mode 100644 resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/BrokerPublisher.java
 create mode 100644 resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/BrokerSubscriber.java
 create mode 100644 resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/BrokerSubscriptionDetails.java
 create mode 100644 resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/CustomConnectorHandler.java
 create mode 100644 resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/ExtendedConnector.java
 create mode 100644 resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/JsonFileParser.java
 create mode 100644 resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/SALCommunicator.java
 create mode 100644 resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/SynchronousBrokerPublisher.java
 create mode 100644 resource-discovery/src/main/java/eu/nebulous/resource/discovery/registration/service/SALRegistrationService.java

diff --git a/resource-discovery/.mvn/wrapper/maven-wrapper.properties b/resource-discovery/.mvn/wrapper/maven-wrapper.properties
index 2e76e18..e70e7bc 100644
--- a/resource-discovery/.mvn/wrapper/maven-wrapper.properties
+++ b/resource-discovery/.mvn/wrapper/maven-wrapper.properties
@@ -1,2 +1,2 @@
-distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.9.4/apache-maven-3.9.4-bin.zip
+distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.9.6/apache-maven-3.9.6-bin.zip
 wrapperUrl=https://repo.maven.apache.org/maven2/org/apache/maven/wrapper/maven-wrapper/3.2.0/maven-wrapper-3.2.0.jar
diff --git a/resource-discovery/Dockerfile b/resource-discovery/Dockerfile
index 7f2a763..4f1e381 100644
--- a/resource-discovery/Dockerfile
+++ b/resource-discovery/Dockerfile
@@ -1,9 +1,6 @@
 
-ARG BUILDER_IMAGE=docker.io/library/maven:3.9.5-eclipse-temurin-17
-ARG RUN_IMAGE=docker.io/library/eclipse-temurin:17.0.8.1_1-jre
-
 # ----------------- Builder image -----------------
-FROM docker.io/library/maven:3.9.5-eclipse-temurin-17 as rd-builder
+FROM docker.io/library/maven:3.9.6-eclipse-temurin-21 as rd-builder
 ENV BASEDIR /app
 WORKDIR ${BASEDIR}
 COPY src        ${BASEDIR}/src
@@ -13,7 +10,7 @@ RUN mvn -f ${BASEDIR}/pom.xml -DskipTests clean install && \
     java -Djarmode=layertools -jar ${BASEDIR}/target/resource-discovery-*.jar extract
 
 # -----------------   Runtime image   -----------------
-FROM docker.io/library/eclipse-temurin:17.0.8.1_1-jre
+FROM docker.io/library/eclipse-temurin:21.0.1_12-jre
 
 # Setup environment
 ENV BASEDIR /opt/resource-discovery
@@ -26,9 +23,9 @@ RUN wget  --progress=dot:giga -O /usr/local/bin/dumb-init \
 
 # Add RD user
 ARG RD_USER=rd
-RUN mkdir ${RD_HOME} ; \
-    addgroup ${RD_USER} ; \
-    adduser --home ${RD_HOME} --no-create-home --ingroup ${RD_USER} --disabled-password ${RD_USER} ; \
+RUN mkdir ${RD_HOME} && \
+    addgroup ${RD_USER} && \
+    adduser --home ${RD_HOME} --no-create-home --ingroup ${RD_USER} --disabled-password ${RD_USER} && \
     chown ${RD_USER}:${RD_USER} ${RD_HOME}
 
 # Set User and Workdir
diff --git a/resource-discovery/pom.xml b/resource-discovery/pom.xml
index bf395bb..ff87f62 100644
--- a/resource-discovery/pom.xml
+++ b/resource-discovery/pom.xml
@@ -6,7 +6,7 @@
 	<parent>
 		<groupId>org.springframework.boot</groupId>
 		<artifactId>spring-boot-starter-parent</artifactId>
-		<version>3.1.4</version>
+		<version>3.2.1</version>
 		<relativePath/> <!-- lookup parent from repository -->
 	</parent>
 
@@ -17,11 +17,19 @@
 	<description>Nebulous resource discovery service</description>
 
 	<properties>
-		<java.version>17</java.version>
+		<java.version>21</java.version>
 		<imageName>${project.artifactId}:${project.version}</imageName>
 	</properties>
 
 	<dependencies>
+
+		<dependency>
+			<groupId>eu.nebulouscloud</groupId>
+			<artifactId>exn-connector-java</artifactId>
+			<version>1.0-SNAPSHOT</version>
+		</dependency>
+
+
 		<dependency>
 			<groupId>org.springframework.boot</groupId>
 			<artifactId>spring-boot-starter-security</artifactId>
@@ -45,12 +53,12 @@
 		<dependency>
 			<groupId>org.projectlombok</groupId>
 			<artifactId>lombok</artifactId>
-			<version>1.18.26</version>
+			<version>1.18.30</version>
 		</dependency>
 		<dependency>
 			<groupId>org.apache.commons</groupId>
 			<artifactId>commons-lang3</artifactId>
-			<version>3.13.0</version>
+			<version>3.14.0</version>
 		</dependency>
 
 		<dependency>
@@ -72,6 +80,57 @@
 			<groupId>com.fasterxml.jackson.datatype</groupId>
 			<artifactId>jackson-datatype-jsr310</artifactId>
 		</dependency>
+
+		<dependency>
+			<groupId>com.googlecode.json-simple</groupId>
+			<artifactId>json-simple</artifactId>
+			<version>1.1.1</version>
+		</dependency>
+
+		<dependency>
+			<groupId>commons-io</groupId>
+			<artifactId>commons-io</artifactId>
+			<version>2.15.1</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.httpcomponents</groupId>
+			<artifactId>httpcore</artifactId>
+			<version>4.4.16</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.httpcomponents</groupId>
+			<artifactId>httpclient</artifactId>
+			<version>4.5.14</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.httpcomponents</groupId>
+			<artifactId>httpmime</artifactId>
+			<version>4.5.14</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.springframework.boot</groupId>
+			<artifactId>spring-boot-starter</artifactId>
+			<exclusions>
+				<exclusion>
+					<groupId>org.springframework.boot</groupId>
+					<artifactId>spring-boot-starter-logging</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+
+		<dependency>
+			<groupId>org.springframework.boot</groupId>
+			<artifactId>spring-boot-starter-web</artifactId>
+			<exclusions>
+				<exclusion>
+					<groupId>org.springframework.boot</groupId>
+					<artifactId>spring-boot-starter-logging</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+
 	</dependencies>
 
 	<dependencyManagement>
@@ -79,7 +138,7 @@
 			<dependency>
 				<groupId>org.yaml</groupId>
 				<artifactId>snakeyaml</artifactId>
-				<version>2.0</version>
+				<version>2.2</version>
 			</dependency>
 		</dependencies>
 	</dependencyManagement>
@@ -93,6 +152,18 @@
 		</plugins>
 	</build>
 
+
+	<repositories>
+		<repository>
+			<id>maven-central</id>
+			<url>https://repo1.maven.org/maven2/</url>
+		</repository>
+		<repository>
+			<id>nexus-nebulous</id>
+			<url>https://s01.oss.sonatype.org/content/repositories/snapshots/</url>
+		</repository>
+	</repositories>
+
 	<!-- Creating Docker image with BUILDPACKS -->
 	<!--<build>
 		<plugins>
diff --git a/resource-discovery/run.sh b/resource-discovery/run.sh
index 1dc69b8..54cd70f 100644
--- a/resource-discovery/run.sh
+++ b/resource-discovery/run.sh
@@ -43,7 +43,7 @@ ${JRE} \
     $JAVA_ADD_OPENS \
     -Djasypt.encryptor.password=$JASYPT_PASSWORD \
     -Djava.security.egd=file:/dev/urandom \
-    org.springframework.boot.loader.JarLauncher \
+    org.springframework.boot.loader.launch.JarLauncher \
     $* &
 
 # Get PID and wait it to exit
diff --git a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/ResourceDiscoveryProperties.java b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/ResourceDiscoveryProperties.java
index 1a9ba44..69610be 100644
--- a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/ResourceDiscoveryProperties.java
+++ b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/ResourceDiscoveryProperties.java
@@ -85,6 +85,13 @@ public class ResourceDiscoveryProperties {
 	// Users
 	private List<UserData> users;
 
+	// Nebulous broker subscription details
+	private String nebulous_broker_ip_address;
+	private int nebulous_broker_port;
+	private String nebulous_broker_username;
+	private String lost_device_topic;
+	private String nebulous_broker_password;
+
 	@Data
 	public static class UserData {
 		private final String username;
diff --git a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/BrokerPublisher.java b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/BrokerPublisher.java
new file mode 100644
index 0000000..03e8665
--- /dev/null
+++ b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/BrokerPublisher.java
@@ -0,0 +1,110 @@
+package eu.nebulous.resource.discovery.broker_communication;
+
+import eu.nebulouscloud.exn.Connector;
+import eu.nebulouscloud.exn.core.Publisher;
+import eu.nebulouscloud.exn.handlers.ConnectorHandler;
+import eu.nebulouscloud.exn.settings.StaticExnConfig;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.*;
+
+
+@Slf4j
+public class BrokerPublisher {
+    public static String EMPTY="";
+    private static HashMap<String, HashSet<String>> broker_and_topics_to_publish_to = new HashMap<>();
+    private Publisher private_publisher_instance;
+    private ArrayList<Publisher> publishers = new ArrayList<>();
+
+    private ExtendedConnector active_connector;
+    private String topic;
+    private String broker_ip;
+    private int broker_port;
+
+    public BrokerPublisher(String topic, String broker_ip, int broker_port, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation) {
+        boolean able_to_initialize_BrokerPublisher = topic!=null && broker_ip!=null && brokerUsername!=null && brokerPassword!=null && !topic.equals(EMPTY) && !broker_ip.equals(EMPTY) && !brokerUsername.equals(EMPTY) && !brokerPassword.equals(EMPTY);
+
+        if (!able_to_initialize_BrokerPublisher){
+            return;
+        }
+        boolean publisher_configuration_changed;
+        if (!broker_and_topics_to_publish_to.containsKey(broker_ip)){
+            HashSet<String> topics_to_publish_to = new HashSet<>();
+            topics_to_publish_to.add(topic);
+            broker_and_topics_to_publish_to.put(broker_ip,topics_to_publish_to);
+            publisher_configuration_changed = true;
+        }else{
+            if (!broker_and_topics_to_publish_to.get(broker_ip).contains(topic)){
+                broker_and_topics_to_publish_to.get(broker_ip).add(topic);
+                publisher_configuration_changed = true;
+            }
+            else{
+                publisher_configuration_changed = false;
+            }
+        }
+
+
+        if (publisher_configuration_changed){
+//            for (String current_broker_ip : broker_and_topics_to_publish_to.keySet()){
+            log.info("Publisher configuration changed, creating new connector at  "+broker_ip+" for topic "+topic);
+            if (active_connector!=null) {
+                active_connector.stop(new ArrayList<>(), publishers);
+            }
+            publishers.clear();
+            for (String broker_topic : broker_and_topics_to_publish_to.get(broker_ip)){
+                //ArrayList<Publisher> publishers = new ArrayList<>();
+                Publisher publisher = new Publisher("resource_manager_"+broker_topic, broker_topic, true, true);
+                publishers.add(publisher);
+                if (broker_topic.equals(topic)){
+                    this.private_publisher_instance = publishers.get(publishers.size()-1);
+                    this.topic = broker_topic;
+                    this.broker_ip = broker_ip;
+                    this.broker_port = broker_port;
+                }
+            }
+            //CustomConnectorHandler custom_handler = new CustomConnectorHandler();
+
+            active_connector = new ExtendedConnector("resource_manager"
+                    , new CustomConnectorHandler() {}
+                    , publishers
+                    , List.of(),
+                    false,
+                    false,
+                    new StaticExnConfig(
+                            broker_ip,
+                            broker_port,
+                            brokerUsername,
+                            brokerPassword,
+                            60,
+                            EMPTY
+                    )
+            );
+            active_connector.start();
+
+        }
+    }
+
+    //TODO The methods below assume that the only content to be sent is json-like
+    public void publish (String json_string_content, Collection<String> application_names){
+
+        for (String application_name : application_names) {
+            JSONParser parser = new JSONParser();
+            JSONObject json_object = new JSONObject();
+            try {
+                json_object = (JSONObject) parser.parse(json_string_content);
+            } catch (ParseException p) {
+                log.warn( "Could not parse the string content to be published to the broker as json, which is the following: "+json_string_content);
+            }
+            if (private_publisher_instance != null) {
+                private_publisher_instance.send(json_object);
+                log.info("Sent new message\n"+json_object.toJSONString());
+            } else {
+                log.error( "Could not send message to AMQP broker, as the broker ip to be used has not been specified");
+            }
+        }
+    }
+}
diff --git a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/BrokerSubscriber.java b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/BrokerSubscriber.java
new file mode 100644
index 0000000..7ff11d5
--- /dev/null
+++ b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/BrokerSubscriber.java
@@ -0,0 +1,194 @@
+package eu.nebulous.resource.discovery.broker_communication;
+
+import eu.nebulouscloud.exn.core.Consumer;
+import eu.nebulouscloud.exn.core.Context;
+import eu.nebulouscloud.exn.core.Handler;
+import eu.nebulouscloud.exn.settings.StaticExnConfig;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.qpid.protonj2.client.Message;
+import org.json.simple.JSONValue;
+
+import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiFunction;
+
+import static eu.nebulous.resource.discovery.broker_communication.BrokerPublisher.EMPTY;
+
+@Slf4j
+public class BrokerSubscriber {
+
+    private class MessageProcessingHandler extends Handler {
+        private BrokerSubscriptionDetails broker_details;
+        private static final BiFunction temporary_function = (Object o, Object o2) -> {
+            //System.out.println("");
+            log.info("REPLACE_TEMPORARY_HANDLING_FUNCTIONALITY");
+            return "IN_PROCESSING";
+        };
+        private BiFunction<BrokerSubscriptionDetails, String, String> processing_function;
+
+        @Override
+        public void onMessage(String key, String address, Map body, Message message, Context context) {
+            log.info("Handling message for address " + address);
+            processing_function.apply(broker_details, JSONValue.toJSONString(body));
+        }
+
+        public MessageProcessingHandler(BrokerSubscriptionDetails broker_details) {
+            this.broker_details = broker_details;
+            this.processing_function = temporary_function;
+        }
+
+        public MessageProcessingHandler(BiFunction<BrokerSubscriptionDetails, String, String> biFunction, BrokerSubscriptionDetails broker_details) {
+            this.broker_details = broker_details;
+            this.processing_function = biFunction;
+        }
+
+        public BiFunction getProcessing_function() {
+            return processing_function;
+        }
+
+        public void setProcessing_function(BiFunction processing_function) {
+            this.processing_function = processing_function;
+        }
+    }
+
+    private static HashMap<String, HashSet<String>> broker_and_topics_to_subscribe_to = new HashMap<>();
+    private static HashMap<String, HashMap<String, Consumer>> active_consumers_per_topic_per_broker_ip = new HashMap<>();
+    private static HashMap<String, ExtendedConnector> current_connectors = new HashMap<>();
+    private String topic;
+    private String broker_ip;
+    private int broker_port;
+    private String brokerUsername;
+    private String brokerPassword;
+    BrokerSubscriptionDetails broker_details;
+
+    public BrokerSubscriber(String topic, String broker_ip, int broker_port, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation, String application_name) {
+        boolean able_to_initialize_BrokerSubscriber = topic != null && broker_ip != null && brokerUsername != null && brokerPassword != null && !topic.equals(EMPTY) && !broker_ip.equals(EMPTY) && !brokerUsername.equals(EMPTY) && !brokerPassword.equals(EMPTY);
+
+        if (!able_to_initialize_BrokerSubscriber) {
+            try {
+                throw new Exception("Unable to initialize Subscriber");
+            } catch (Exception e) {
+                String message = "Topic is " + topic + " broker ip is " + broker_ip + " broker username/pass are " + brokerUsername + "," + brokerPassword;
+
+                log.info(message);
+                throw new RuntimeException(e);
+            }
+        }
+        broker_details = new BrokerSubscriptionDetails(broker_ip, broker_port, brokerUsername, brokerPassword, application_name, topic);
+        boolean subscriber_configuration_changed;
+        if (!broker_and_topics_to_subscribe_to.containsKey(broker_ip)) {
+            HashSet<String> topics_to_subscribe_to = new HashSet<>();
+            //topics_to_subscribe_to.add(realtime_metric_topic_name);
+            //topics_to_subscribe_to.add(forecasted_metric_topic_name);
+            //topics_to_subscribe_to.add();
+            topics_to_subscribe_to.add(topic);
+            broker_and_topics_to_subscribe_to.put(broker_ip, new HashSet<>());
+            active_consumers_per_topic_per_broker_ip.put(broker_ip, new HashMap<>());
+            broker_and_topics_to_subscribe_to.get(broker_ip).add(topic);
+
+            subscriber_configuration_changed = true;
+        } else {
+            if (!broker_and_topics_to_subscribe_to.get(broker_ip).contains(topic)) {
+                broker_and_topics_to_subscribe_to.get(broker_ip).add(topic);
+                subscriber_configuration_changed = true;
+            } else {
+                subscriber_configuration_changed = false;
+            }
+        }
+        if (subscriber_configuration_changed) {
+            Consumer current_consumer;
+            if (application_name != null && !application_name.equals(EMPTY)) { //Create a consumer for one application
+                log.info("APP level subscriber " + topic);
+                current_consumer = new Consumer(topic, topic, new MessageProcessingHandler(broker_details), application_name, true, true);
+            } else { //Allow the consumer to get information from any publisher
+                current_consumer = new Consumer(topic, topic, new MessageProcessingHandler(broker_details), true, true);
+                log.info("HIGH level subscriber " + topic);
+            }
+            active_consumers_per_topic_per_broker_ip.get(broker_ip).put(topic, current_consumer);
+
+            this.topic = topic;
+            this.broker_ip = broker_ip;
+            this.broker_port = broker_port;
+            this.brokerUsername = brokerUsername;
+            this.brokerPassword = brokerPassword;
+            add_topic_consumer_to_broker_connector(current_consumer);
+        }
+    }
+
+    /**
+     * This method updates the global connector of Resource manager to the AMQP server, by adding support for one more component
+     */
+    private void add_topic_consumer_to_broker_connector(Consumer new_consumer) {
+        if (current_connectors.get(broker_ip) != null) {
+            current_connectors.get(broker_ip).add_consumer(new_consumer);
+        } else {
+            ArrayList<Consumer> consumers = new ArrayList<>();
+            consumers.add(new_consumer);
+            ExtendedConnector extended_connector = new ExtendedConnector("resource_manager",
+                    new CustomConnectorHandler() {
+                    },
+                    List.of(),
+                    consumers,
+                    false,
+                    false,
+                    new StaticExnConfig(
+                            broker_ip,
+                            broker_port,
+                            brokerUsername,
+                            brokerPassword,
+                            60,
+                            EMPTY
+                    )
+            );
+            extended_connector.start();
+            current_connectors.put(broker_ip, extended_connector);
+        }
+    }
+
+    private void remove_topic_from_broker_connector(String topic_key) {
+        if (current_connectors.get(broker_ip) != null) {
+            current_connectors.get(broker_ip).remove_consumer_with_key(topic_key);
+        }
+    }
+
+    public int subscribe(BiFunction function, String application_name, AtomicBoolean stop_signal) {
+        int exit_status = -1;
+        log.info("ESTABLISHING SUBSCRIPTION for " + topic);
+        //First remove any leftover consumer
+        if (active_consumers_per_topic_per_broker_ip.containsKey(broker_ip)) {
+            active_consumers_per_topic_per_broker_ip.get(broker_ip).remove(topic);
+            remove_topic_from_broker_connector(topic);
+        } else {
+            active_consumers_per_topic_per_broker_ip.put(broker_ip, new HashMap<>());
+        }
+        //Then add the new consumer
+        Consumer new_consumer;
+        if (application_name != null && !application_name.equals(EMPTY)) {
+            new_consumer = new Consumer(topic, topic, new MessageProcessingHandler(function, broker_details), application_name,
+                    true, true);
+        } else {
+            new_consumer = new Consumer(topic, topic, new MessageProcessingHandler(function, broker_details), true, true);
+        }
+        new_consumer.setProperty("topic", topic);
+        active_consumers_per_topic_per_broker_ip.get(broker_ip).put(topic, new_consumer);
+        add_topic_consumer_to_broker_connector(new_consumer);
+
+        log.info("ESTABLISHED SUBSCRIPTION to topic " + topic);
+        synchronized (stop_signal) {
+            while (!stop_signal.get()) {
+                try {
+                    stop_signal.wait();
+                } catch (Exception e) {
+                    log.warn( e.toString() + " in thread " + Thread.currentThread().getName());
+                    break;
+                }
+            }
+            log.info("Stopping subscription for broker " + broker_ip + " and topic " + topic + "at thread " + Thread.currentThread().getName());
+            stop_signal.set(false);
+        }
+        active_consumers_per_topic_per_broker_ip.get(broker_ip).remove(topic);
+        remove_topic_from_broker_connector(topic);
+        exit_status = 0;
+        return exit_status;
+    }
+}
\ No newline at end of file
diff --git a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/BrokerSubscriptionDetails.java b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/BrokerSubscriptionDetails.java
new file mode 100644
index 0000000..65ba8f6
--- /dev/null
+++ b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/BrokerSubscriptionDetails.java
@@ -0,0 +1,79 @@
+package eu.nebulous.resource.discovery.broker_communication;
+
+import static eu.nebulous.resource.discovery.broker_communication.BrokerPublisher.EMPTY;
+
+public class BrokerSubscriptionDetails {
+    String broker_username = "admin";
+    String broker_password = "admin";
+    String broker_ip = "localhost";
+    int broker_port = 5672;
+    String application_name = "default_application";
+    String topic = EMPTY;
+
+    public BrokerSubscriptionDetails(String broker_ip, int broker_port, String broker_username, String broker_password,String application_name, String topic) {
+        this.broker_ip = broker_ip;
+        this.broker_port = broker_port;
+        this.broker_username = broker_username;
+        this.broker_password = broker_password;
+        this.topic = topic;
+        this.application_name = application_name;
+    }
+
+    public BrokerSubscriptionDetails(boolean fake_broker_subscription) {
+        if (fake_broker_subscription) {
+            this.broker_username = EMPTY;
+            this.broker_password = EMPTY;
+            this.broker_ip = EMPTY;
+            this.topic = EMPTY;
+            this.application_name = EMPTY;
+        }
+    }
+
+    public String getBroker_username() {
+        return broker_username;
+    }
+
+    public void setBroker_username(String broker_username) {
+        this.broker_username = broker_username;
+    }
+
+    public String getBroker_password() {
+        return broker_password;
+    }
+
+    public void setBroker_password(String broker_password) {
+        this.broker_password = broker_password;
+    }
+
+    public String getBroker_ip() {
+        return broker_ip;
+    }
+
+    public void setBroker_ip(String broker_ip) {
+        this.broker_ip = broker_ip;
+    }
+
+    public String getApplication_name() {
+        return application_name;
+    }
+
+    public void setApplication_name(String application_name) {
+        this.application_name = application_name;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public int getBroker_port() {
+        return broker_port;
+    }
+
+    public void setBroker_port(int broker_port) {
+        this.broker_port = broker_port;
+    }
+}
diff --git a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/CustomConnectorHandler.java b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/CustomConnectorHandler.java
new file mode 100644
index 0000000..6959a21
--- /dev/null
+++ b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/CustomConnectorHandler.java
@@ -0,0 +1,24 @@
+package eu.nebulous.resource.discovery.broker_communication;
+
+import eu.nebulouscloud.exn.core.Context;
+import eu.nebulouscloud.exn.handlers.ConnectorHandler;
+
+public class CustomConnectorHandler extends ConnectorHandler {
+    private Context context;
+
+    @Override
+    public void onReady(Context context) {
+        this.context = context;
+    }
+    public void remove_consumer_with_key(String key){
+        context.unregisterConsumer(key);
+    }
+
+    public Context getContext() {
+        return context;
+    }
+
+    public void setContext(Context context) {
+        this.context = context;
+    }
+}
diff --git a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/ExtendedConnector.java b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/ExtendedConnector.java
new file mode 100644
index 0000000..1e09f3e
--- /dev/null
+++ b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/ExtendedConnector.java
@@ -0,0 +1,99 @@
+package eu.nebulous.resource.discovery.broker_communication;
+
+import eu.nebulouscloud.exn.Connector;
+import eu.nebulouscloud.exn.core.Consumer;
+import eu.nebulouscloud.exn.core.Context;
+import eu.nebulouscloud.exn.core.Publisher;
+import eu.nebulouscloud.exn.handlers.ConnectorHandler;
+import eu.nebulouscloud.exn.settings.ExnConfig;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.catalina.util.CustomObjectInputStream;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@Slf4j
+public class ExtendedConnector extends Connector {
+    private CustomConnectorHandler handler;
+
+    private Connector connector;
+    public ExtendedConnector(String component, ConnectorHandler handler, List<Publisher> publishers, List<Consumer> consumers, boolean enableState, boolean enableHealth, ExnConfig configuration) {
+        super(component, handler, publishers, consumers, enableState, enableHealth, configuration);
+        this.handler =(CustomConnectorHandler) handler;
+    }
+
+    public ExtendedConnector(String component, ConnectorHandler handler, List<Publisher> publishers, List<Consumer> consumers, boolean enableState, ExnConfig configuration) {
+        super(component, handler, publishers, consumers, enableState, configuration);
+        this.handler = (CustomConnectorHandler) handler;
+    }
+
+    public ExtendedConnector(String component, ConnectorHandler handler, List<Publisher> publishers, List<Consumer> consumers, ExnConfig configuration) {
+        super(component, handler, publishers, consumers, configuration);
+        this.handler = (CustomConnectorHandler) handler;
+    }
+
+    public CustomConnectorHandler getHandler() {
+        return (CustomConnectorHandler) handler;
+    }
+
+    public void setHandler(CustomConnectorHandler handler) {
+        this.handler = handler;
+    }
+
+    public void remove_consumer_with_key(String key) {
+        try {
+            Context context = ((CustomConnectorHandler)handler).getContext();
+            context.unregisterConsumer(key);
+        }catch (ClassCastException c){
+            log.warn("Could not unregister consumer, as the handler of the Connector it belongs to is not a CustomConnectorHandler");
+        }
+    }
+
+    private void remove_publisher_with_key(String key) {
+        try {
+            Context context = ((CustomConnectorHandler)handler).getContext();
+            context.unregisterPublisher(key);
+        }catch (ClassCastException c){
+            log.warn("Could not unregister consumer, as the handler of the Connector it belongs to is not a CustomConnectorHandler");
+        }
+    }
+
+    public void add_consumer(Consumer newConsumer) {
+
+        try {
+            ((CustomConnectorHandler)handler).getContext().registerConsumer(newConsumer);
+        }catch (ClassCastException c){
+            log.warn("Could not register consumer, as the handler of the Connector it belongs to is not a CustomConnectorHandler");
+        }
+    }
+
+    public void stop(ArrayList<Consumer> consumers, ArrayList <Publisher> publishers){
+        if (consumers.size()>0) {
+            stop_consumers(consumers);
+        }
+        if (publishers.size()>0) {
+            stop_publishers(publishers);
+        }
+    }
+
+
+    public void stop_consumers(ArrayList<Consumer> consumers){
+        for (Consumer consumer : consumers){
+            remove_consumer_with_key(consumer.key());
+        }
+    }
+    public void stop_publishers(ArrayList<Publisher> publishers){
+        for (Publisher publisher : publishers){
+            remove_publisher_with_key(publisher.key());
+        }
+    }
+
+    public Connector getConnector() {
+        return connector;
+    }
+
+    public void setConnector(Connector connector) {
+        this.connector = connector;
+    }
+
+}
diff --git a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/JsonFileParser.java b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/JsonFileParser.java
new file mode 100644
index 0000000..210d10b
--- /dev/null
+++ b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/JsonFileParser.java
@@ -0,0 +1,27 @@
+package eu.nebulous.resource.discovery.broker_communication;
+
+
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import java.io.FileReader;
+
+public class JsonFileParser {
+    public static JSONObject parse(String file_name){
+        JSONParser parser = new JSONParser();
+        try {
+            Object obj = parser.parse(new FileReader(file_name));
+            JSONObject jsonObject = (JSONObject) obj;
+            // Access properties of the parsed JSON object here
+            return jsonObject;
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return new JSONObject();
+    }
+    public static void main(String[] args) {
+        String currentDir = System.getProperty("user.dir");
+        System.out.println("Current Directory: " + currentDir);
+        String parsed_file_string = parse("./src/main/java/eu/nebulous/resource/discovery/broker_communication/file.json").toString();
+        System.out.println(parsed_file_string);
+    }
+}
diff --git a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/SALCommunicator.java b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/SALCommunicator.java
new file mode 100644
index 0000000..10f28da
--- /dev/null
+++ b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/SALCommunicator.java
@@ -0,0 +1,355 @@
+package eu.nebulous.resource.discovery.broker_communication;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.entity.mime.MultipartEntityBuilder;
+import org.apache.http.entity.mime.content.StringBody;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+
+import java.io.*;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+
+/**
+ * Assuming that only one SALCommunicator will exist - otherwise some variables should have their characterization as 'static' be removed
+ */
+@Slf4j
+public class SALCommunicator {
+
+    private static String sal_host = "localhost";
+    private static String sal_port = "9000";
+
+    private static String mylogin = "admin";
+    private static String mypassword = "admin";
+
+
+    public static void register_device(int cpu_cores, int ram_gb, int disk, String application_name, String internal_ip_address, String external_ip_address, String city, String country, String latitude, String longitude){
+
+
+
+    }
+
+    public static String get_connection_id(String sal_host, int sal_port, String sal_username, String sal_password){
+        // Request 1 - Get sessionID
+        HashMap<String,String> authentication_map = new HashMap<>();
+        authentication_map.put("username",sal_username);
+        authentication_map.put("password",sal_password);
+        String sessionID = sendPOSTRequest("http://"+sal_host+":"+sal_port+"/sal/pagateway/connect", new HashMap<>(), authentication_map);
+        log.info("Retrieved session id "+sessionID);
+        return sessionID;
+    }
+
+    public static void main(String[] args) {
+
+
+
+
+        //String contentType = "application/json";
+
+        String sessionID = get_connection_id("localhost",9000,"admin","admin");
+        log.info("The session id is "+sessionID);
+        ArrayList<String> applications = get_running_applications(request_running_applications_REST(sessionID));
+        log.info("The running apps are "+applications.toString());
+
+        register_devices("./src/main/resources/sal_device_registration_base_payload.json", sessionID, applications,"10.100.100","100.100.100.",10,10,10,"test12","test_provider","Athens","Greece",100);
+        // Request 4
+        //String payload4 = "{\"key3\": \"value3\"}";
+        //sendRequest("https://api.example.com/endpoint3", sessionID, contentType, payload4);
+    }
+
+    public static String request_running_applications_AMQP() {
+        // Request 2 - Get available jobs
+        //String get_jobs_string = sendGETRequest("http://localhost:9000/sal/job/" );
+        //return  get_jobs_string;
+        return null;
+    }
+
+    private static String request_running_applications_REST(String sessionID) {
+
+        // Request 2 - Get available jobs
+        String get_jobs_payload = "{\"sessionid\": \""+sessionID+"\"}";
+        HashMap<String,String> session_id_headers = new HashMap<>();
+        session_id_headers.put("sessionid",sessionID);
+        log.info("Using temporary \"job\" endpoint to get the jobs from SAL...");
+        String get_jobs_string = sendGETRequest("http://localhost:9000/sal/job/",session_id_headers );
+        return  get_jobs_string;
+    }
+
+    private static void register_devices(String request_body_file, String sessionID, ArrayList<String> applications,String internal_ip_address, String external_ip_address, int cpu_cores, int ram_gb, int disk_gb, String device_name,String provider_id, String city_name, String country_name, int number_of_devices_to_register) {
+
+        for (int counter = 0; counter < number_of_devices_to_register; counter++) {
+            JSONObject json = JsonFileParser.parse(request_body_file);
+            if (number_of_devices_to_register>1) { //Test mode, TODO delete
+
+                json.put("name", "test" + counter);
+
+                ((JSONObject) ((JSONArray) json.get("ipAddresses")).get(0)).put("value", internal_ip_address + counter);
+                ((JSONObject) json.get("nodeProperties")).put("disk", new Random().nextInt(1, 101));
+                ((JSONObject) json.get("nodeProperties")).put("memory", new Random().nextInt(1, 17));
+                ((JSONObject) json.get("nodeProperties")).put("providerId", String.valueOf(new Random().nextInt(1, 21)));
+                ((JSONObject) json.get("nodeProperties")).put("numberOfCores", new Random().nextInt(1, 17));
+
+                String[] country_choices = {"Greece", "Poland", "France"};
+                String[] city_choices = {"Athens", "Warsaw", "Nice"};
+                int random_int = new Random().nextInt(0, 3);
+                ((JSONObject) ((JSONObject) json.get("nodeProperties")).get("geoLocation")).put("country", country_choices[random_int]);
+                ((JSONObject) ((JSONObject) json.get("nodeProperties")).get("geoLocation")).put("city", city_choices[random_int]);
+                ((JSONObject) ((JSONObject) json.get("nodeProperties")).get("geoLocation")).put("latitude", new Random().nextFloat(-90, 90));
+                ((JSONObject) ((JSONObject) json.get("nodeProperties")).get("geoLocation")).put("longitude", new Random().nextFloat(-90, 90));
+                // Request 3 - Register device for a particular job
+            }else{
+                json.put("name", device_name);
+                ((JSONObject) ((JSONArray) json.get("ipAddresses")).get(0)).put("value", internal_ip_address);
+                ((JSONObject) ((JSONArray) json.get("ipAddresses")).get(1)).put("value", external_ip_address);
+                ((JSONObject) json.get("nodeProperties")).put("disk", disk_gb);
+                ((JSONObject) json.get("nodeProperties")).put("memory", ram_gb);
+                ((JSONObject) json.get("nodeProperties")).put("providerId", provider_id);
+                ((JSONObject) json.get("nodeProperties")).put("numberOfCores", cpu_cores);
+                ((JSONObject) ((JSONObject) json.get("nodeProperties")).get("geoLocation")).put("country", country_name);
+                ((JSONObject) ((JSONObject) json.get("nodeProperties")).get("geoLocation")).put("city", city_name);
+                ((JSONObject) ((JSONObject) json.get("nodeProperties")).get("geoLocation")).put("latitude", new Random().nextFloat(-90, 90));
+                ((JSONObject) ((JSONObject) json.get("nodeProperties")).get("geoLocation")).put("longitude", new Random().nextFloat(-90, 90));
+            }
+
+            System.out.println(json.toJSONString());
+
+            for (String application : applications) {
+                json.put("jobId", application);
+                String payload3 = json.toJSONString();
+                HashMap<String, String> headers = new HashMap<>();
+                headers.put("sessionid", sessionID);
+                headers.put("Content-Type", "application/json");
+                sendPOSTRequest("http://" + sal_host + ":" + sal_port + "/sal/edge/" + application, headers, payload3);
+
+            }
+        }
+    }
+
+
+    public static String get_device_registration_json(String internal_ip_address, String external_ip_address, int cpu_cores, int ram_gb, int disk_gb, String device_name,String provider_id, String city_name, String country_name, String device_username, String device_password) {
+
+            JSONObject root_json_object = new JSONObject();
+            JSONObject loginCredential = new JSONObject();
+            JSONObject ipAddress1 = new JSONObject();
+            JSONObject ipAddress2 = new JSONObject();
+            JSONObject operatingSystem = new JSONObject();
+            JSONObject geoLocation = new JSONObject();
+            JSONObject nodeProperties = new JSONObject();
+
+            loginCredential.put("username", device_username);
+            loginCredential.put("password", device_password);
+            loginCredential.put("privateKey", "");
+
+
+            ipAddress1.put("IpAddressType", "PUBLIC_IP");
+            ipAddress1.put("IpVersion", "V4");
+            ipAddress1.put("value", external_ip_address);
+
+            ipAddress2.put("IpAddressType", "PRIVATE_IP");
+            ipAddress2.put("IpVersion", "V4");
+            ipAddress2.put("value", internal_ip_address);
+
+
+            operatingSystem.put("operatingSystemFamily", "UBUNTU");
+            operatingSystem.put("operatingSystemArchitecture", "ARMv8");
+            operatingSystem.put("operatingSystemVersion", 1804);
+
+            geoLocation.put("city", city_name);
+            geoLocation.put("country", country_name);
+            geoLocation.put("latitude", new Random().nextFloat(-90, 90));
+            geoLocation.put("longitude", new Random().nextFloat(-90, 90));
+
+            nodeProperties.put("providerId", provider_id);
+            nodeProperties.put("numberOfCores", cpu_cores);
+            nodeProperties.put("memory", ram_gb);
+            nodeProperties.put("disk", disk_gb);
+            nodeProperties.put("operatingSystem", operatingSystem);
+            nodeProperties.put("geoLocation", geoLocation);
+
+            root_json_object.put("name", device_name);
+            root_json_object.put("loginCredential", loginCredential);
+
+            JSONArray ipAddresses = new JSONArray();
+            ipAddresses.add(ipAddress1);
+            ipAddresses.add(ipAddress2);
+            root_json_object.put("ipAddresses", ipAddresses);
+
+            root_json_object.put("nodeProperties", nodeProperties);
+            root_json_object.put("systemArch", "ARMv8");
+            root_json_object.put("scriptURL", "https://www.google.com");
+            root_json_object.put("jarURL", "https://www.activeeon.com/public_content/7cde3381417ff3784639dc41fa7e7cd0544a5234-morphemic-7bulls/node_13.1.0-SNAPSHOT_armv8.jar");
+
+
+            //JSONObject root_json_object = JsonFileParser.parse(request_body_file);
+            //root_json_object.put("name", device_name);
+            //((JSONObject) ((JSONArray) root_json_object.get("ipAddresses")).get(0)).put("value", internal_ip_address);
+            //((JSONObject) ((JSONArray) root_json_object.get("ipAddresses")).get(1)).put("value", external_ip_address);
+            //((JSONObject) root_json_object.get("nodeProperties")).put("disk", disk_gb);
+            //((JSONObject) root_json_object.get("nodeProperties")).put("memory", ram_gb);
+            //((JSONObject) root_json_object.get("nodeProperties")).put("providerId", provider_id);
+            //((JSONObject) root_json_object.get("nodeProperties")).put("numberOfCores", cpu_cores);
+            //((JSONObject) ((JSONObject) root_json_object.get("nodeProperties")).get("geoLocation")).put("country", country_name);
+            //((JSONObject) ((JSONObject) root_json_object.get("nodeProperties")).get("geoLocation")).put("city", city_name);
+            //((JSONObject) ((JSONObject) root_json_object.get("nodeProperties")).get("geoLocation")).put("latitude", new Random().nextFloat(-90, 90));
+            //((JSONObject) ((JSONObject) root_json_object.get("nodeProperties")).get("geoLocation")).put("longitude", new Random().nextFloat(-90, 90));
+            return(root_json_object.toJSONString());
+    }
+
+
+    public static ArrayList<String> get_running_applications(String running_jobs_string) {
+
+        ArrayList<String>applications = new ArrayList<>();
+        JSONParser parser = new JSONParser();
+        try{
+            Object received_json = parser.parse(running_jobs_string);
+            if (received_json instanceof JSONArray) {
+                JSONArray jobs_array = (JSONArray) parser.parse(running_jobs_string);
+                for (int i = 0; i < jobs_array.size(); i++) {
+                    JSONObject json_job_object = (JSONObject) jobs_array.get(i);
+                    applications.add((String) json_job_object.get("jobId"));
+                }
+            }else if (received_json instanceof JSONObject){
+                JSONObject json_job_object = (JSONObject) received_json;
+                applications.add((String) json_job_object.get("jobId"));
+            }
+        }catch (Exception e){
+            e.printStackTrace();
+            System.out.println("This is the input json jobs string\n\n");
+            System.out.println(running_jobs_string);
+        }
+        return  applications;
+    }
+
+    private static String sendGETRequest(String url, HashMap<String,String>headers) {
+        String response_string = "";
+        CloseableHttpClient client = HttpClients.createDefault();
+        HttpGet httpGet = new HttpGet(url);
+
+        // Set headers
+        for (Map.Entry<String,String> entry : headers.entrySet()) {
+            httpGet.setHeader(entry.getKey(), entry.getValue());
+        }
+
+        CloseableHttpResponse response = null;
+        try {
+            response = client.execute(httpGet);
+            HttpEntity responseEntity = response.getEntity();
+            if (responseEntity != null) {
+                InputStream inputStream = responseEntity.getContent();
+                response_string = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
+                byte[] buffer = new byte[1024];
+                int bytesRead;
+                while ((bytesRead = inputStream.read(buffer)) != -1) {
+                    System.out.write(buffer, 0, bytesRead);
+                }
+            }
+            response.close();
+            client.close();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        System.out.println("Status code: " + response.getStatusLine().getStatusCode());
+        return response_string;
+    }
+
+    public static String sendPOSTRequest(String urlString, HashMap<String,String> headers, HashMap<String,String> multipart_form) {
+        String response_string = "Invalid response";
+        HttpPost post = new HttpPost(urlString);
+        try {
+            MultipartEntityBuilder builder = MultipartEntityBuilder.create();
+            // Create a multipart entity
+            if (!multipart_form.isEmpty()) {
+                for (Map.Entry<String,String> entry : multipart_form.entrySet()) {
+                    builder.addPart(entry.getKey(), new StringBody(entry.getValue(), ContentType.TEXT_PLAIN));
+                }
+                //post.setHeader("Content-Type", "multipart/form-data");
+                HttpEntity entity = builder.build();
+                post.setEntity(entity);
+            }
+
+            if (!headers.isEmpty()) {
+                for (Map.Entry<String,String> entry : headers.entrySet()) {
+                    post.setHeader(entry.getKey(), entry.getValue());
+                }
+            }
+
+
+            CloseableHttpClient client = HttpClients.createDefault();
+
+
+            HttpResponse response = client.execute(post);
+            HttpEntity responseEntity = response.getEntity();
+            if (responseEntity != null) {
+                InputStream inputStream = responseEntity.getContent();
+                response_string = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
+                System.out.println("Printing before "+response_string);
+                byte[] buffer = new byte[1024];
+                int bytesRead;
+                while ((bytesRead = inputStream.read(buffer)) != -1) {
+                    System.out.write(buffer, 0, bytesRead);
+                }
+            }
+
+        }catch (Exception e){
+            e.printStackTrace();
+        }
+        System.out.println("Returning the response string "+response_string);
+        return response_string;
+    }
+
+
+
+    public static String sendPOSTRequest(String urlString, HashMap<String,String> headers, String payload) {
+        String response_string = "";
+        CloseableHttpClient httpclient = HttpClients.createDefault();
+        HttpPost httpPost = new HttpPost(urlString);
+
+        // Set headers
+        if (!headers.isEmpty()) {
+            for (Map.Entry<String,String> entry : headers.entrySet()) {
+                httpPost.setHeader(entry.getKey(), entry.getValue());
+            }
+        }
+        // Define JSON payload
+        String jsonPayload = payload;
+        StringEntity entity = null;
+        try {
+            entity = new StringEntity(jsonPayload);
+        } catch (UnsupportedEncodingException e) {
+            throw new RuntimeException(e);
+        }
+        httpPost.setEntity(entity);
+
+        // Execute the request
+        CloseableHttpResponse response = null;
+        try {
+            response = httpclient.execute(httpPost);
+
+            // Handle the response
+            HttpEntity responseEntity = response.getEntity();
+            response_string = EntityUtils.toString(responseEntity);
+            // Close resources
+            response.close();
+            httpclient.close();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return response_string;
+    }
+}
diff --git a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/SynchronousBrokerPublisher.java b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/SynchronousBrokerPublisher.java
new file mode 100644
index 0000000..3958e17
--- /dev/null
+++ b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/broker_communication/SynchronousBrokerPublisher.java
@@ -0,0 +1,149 @@
+package eu.nebulous.resource.discovery.broker_communication;
+
+import eu.nebulouscloud.exn.core.Publisher;
+import eu.nebulouscloud.exn.core.SyncedPublisher;
+import eu.nebulouscloud.exn.settings.StaticExnConfig;
+import lombok.extern.slf4j.Slf4j;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.slf4j.Marker;
+
+import java.util.*;
+
+
+@Slf4j
+public class SynchronousBrokerPublisher {
+    public static String EMPTY="";
+    private static HashMap<String, HashSet<String>> broker_and_topics_to_publish_to = new HashMap<>();
+    private SyncedPublisher private_publisher_instance;
+    private ArrayList<Publisher> publishers = new ArrayList<>();
+
+    private ExtendedConnector active_connector;
+    private String topic;
+    private String broker_ip;
+
+    public SynchronousBrokerPublisher(String topic, String broker_ip, int broker_port, String brokerUsername, String brokerPassword, String amqLibraryConfigurationLocation) {
+
+        boolean able_to_initialize_BrokerPublisher = topic!=null && broker_ip!=null && brokerUsername!=null && brokerPassword!=null && !topic.equals(EMPTY) && !broker_ip.equals(EMPTY) && !brokerUsername.equals(EMPTY) && !brokerPassword.equals(EMPTY);
+
+        if (!able_to_initialize_BrokerPublisher){
+            return;
+        }
+        boolean publisher_configuration_changed;
+        if (!broker_and_topics_to_publish_to.containsKey(broker_ip)){
+            HashSet<String> topics_to_publish_to = new HashSet<>();
+            topics_to_publish_to.add(topic);
+            broker_and_topics_to_publish_to.put(broker_ip,topics_to_publish_to);
+            log.error("changed1");
+            publisher_configuration_changed = true;
+        }else{
+            if (!broker_and_topics_to_publish_to.get(broker_ip).contains(topic)){
+                broker_and_topics_to_publish_to.get(broker_ip).add(topic);
+                log.error("changed2");
+                publisher_configuration_changed = true;
+            }
+            else{
+                publisher_configuration_changed = false;
+            }
+        }
+
+        log.error("preliminary_outside");
+        if (publisher_configuration_changed){
+            log.error("preliminary_inside1");
+//            for (String current_broker_ip : broker_and_topics_to_publish_to.keySet()){
+            log.info("Publisher configuration changed, creating new connector at  "+broker_ip+" for topic "+topic);
+            if (active_connector!=null) {
+                active_connector.stop(new ArrayList<>(), publishers);
+            }
+            publishers.clear();
+            for (String broker_topic : broker_and_topics_to_publish_to.get(broker_ip)){
+                log.error("preliminary_inside2");
+                //ArrayList<Publisher> publishers = new ArrayList<>();
+                SyncedPublisher publisher = new SyncedPublisher("resource_manager_"+broker_topic, broker_topic, true, true);
+                publishers.add(publisher);
+                if (broker_topic.equals(topic)){
+                    log.error("inside_assignment_to_private_publisher_instance");
+                    this.private_publisher_instance = (SyncedPublisher) publishers.get(publishers.size()-1);
+                    this.topic = broker_topic;
+                    this.broker_ip = broker_ip;
+                }
+            }
+            //CustomConnectorHandler custom_handler = new CustomConnectorHandler();
+
+            active_connector = new ExtendedConnector("resource_manager"
+                    , new CustomConnectorHandler() {}
+                    , publishers
+                    , List.of(),
+                    false,
+                    false,
+                    new StaticExnConfig(
+                            broker_ip,
+                            broker_port,
+                            brokerUsername,
+                            brokerPassword,
+                            60,
+                            EMPTY
+                    )
+            );
+            active_connector.start();
+
+        }
+    }
+
+
+    public Map publish_for_response (String json_string_content, Collection<String> application_names){
+        Map reply = null;
+        HashMap<String,Object> payload = new HashMap<>();
+        HashMap<String,String> metadata = new HashMap<>();
+        metadata.put("user","admin");
+        metadata.put("type","edge");
+        if (application_names!=null && !application_names.isEmpty()) {
+            for (String application_name : application_names) {
+
+                boolean successful_json_parsing = false;
+                JSONParser parser = new JSONParser();
+                JSONObject json_object = new JSONObject();
+                try {
+                    json_object = (JSONObject) parser.parse(json_string_content);
+                    successful_json_parsing = true;
+                } catch (ParseException p) {
+                    log.warn("Could not parse the string content to be published to the broker as json, which is the following: " + json_string_content);
+                }
+                metadata.put("jobId",application_name);
+                payload.put("metaData",metadata);
+                if (private_publisher_instance != null) {
+                    //reply = private_publisher_instance.sendSync(json_object, application_name, null, false);
+                    if (successful_json_parsing) {
+                        json_object.put("jobId",application_name);
+                        payload.put("body",json_object.toJSONString());
+                        reply = private_publisher_instance.sendSync(payload, application_name, null, false);
+                    }else{
+                        payload.put("body",json_string_content);
+                        log.warn(Marker.ANY_MARKER,"Sending the original json string without any modification as its parsing was not successful");
+                        reply = private_publisher_instance.sendSync(payload, application_name, null, false);
+                    }
+                } else {
+                    log.error("Could not send message to AMQP broker, as the private publisher instance is null (is broker ip specified?)");
+                }
+            }
+        }else{ //Send an empty string for application
+            JSONParser parser = new JSONParser();
+            JSONObject json_object = new JSONObject();
+            try {
+                json_object = (JSONObject) parser.parse(json_string_content);
+
+            } catch (ParseException p) {
+                log.warn("Could not parse the string content to be published to the broker as json, which is the following: " + json_string_content);
+            }
+            if (private_publisher_instance != null) {
+                log.info("Sending new synchronous message\n"+json_object.toJSONString());
+                reply = private_publisher_instance.sendSync(json_object,EMPTY, null, false);
+                log.info("Sent new synchronous message\n"+json_object.toJSONString());
+            } else {
+                log.error("Could not send message to AMQP broker, as the private publisher instance is null (is broker ip specified?)");
+            }
+        }
+        return reply;
+    }
+}
diff --git a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/monitor/DeviceProcessor.java b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/monitor/DeviceProcessor.java
index 054135e..8fe87bb 100644
--- a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/monitor/DeviceProcessor.java
+++ b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/monitor/DeviceProcessor.java
@@ -2,11 +2,14 @@
 package eu.nebulous.resource.discovery.monitor;
 
 import eu.nebulous.resource.discovery.ResourceDiscoveryProperties;
+import eu.nebulous.resource.discovery.broker_communication.BrokerPublisher;
+import java.time.Clock;
 import eu.nebulous.resource.discovery.monitor.model.Device;
 import eu.nebulous.resource.discovery.monitor.model.DeviceStatus;
 import eu.nebulous.resource.discovery.monitor.service.DeviceManagementService;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.json.simple.JSONObject;
 import org.springframework.beans.factory.InitializingBean;
 import org.springframework.scheduling.TaskScheduler;
 import org.springframework.scheduling.annotation.EnableAsync;
@@ -17,6 +20,7 @@ import java.time.Duration;
 import java.time.Instant;
 import java.time.ZoneId;
 import java.time.temporal.ChronoUnit;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
@@ -130,6 +134,12 @@ public class DeviceProcessor  implements InitializingBean {
                     && device.getCreationDate().isBefore(failedDeviceThreshold) )
             {
                 device.setStatus(DeviceStatus.FAILED);
+                JSONObject lost_device_message = new JSONObject();
+                lost_device_message.put("device_name",device.getName());
+                Clock clock = Clock.systemUTC();
+                lost_device_message.put("timestamp",(int)(clock.millis()/1000));
+                BrokerPublisher device_lost_publisher = new BrokerPublisher(processorProperties.getLost_device_topic(), processorProperties.getNebulous_broker_ip_address(), processorProperties.getNebulous_broker_port(),processorProperties.getNebulous_broker_username(), processorProperties.getNebulous_broker_password(), "");
+                device_lost_publisher.publish(lost_device_message.toJSONString(), Collections.singleton(""));
                 log.warn("processFailedDevices: Marked as FAILED device with Id: {}", device.getId());
             }
 
diff --git a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/monitor/controller/DeviceManagementController.java b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/monitor/controller/DeviceManagementController.java
index 8b2c213..d968f8e 100644
--- a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/monitor/controller/DeviceManagementController.java
+++ b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/monitor/controller/DeviceManagementController.java
@@ -7,6 +7,7 @@ import eu.nebulous.resource.discovery.monitor.model.DeviceException;
 import eu.nebulous.resource.discovery.monitor.service.DeviceLifeCycleRequestService;
 import eu.nebulous.resource.discovery.monitor.service.DeviceManagementService;
 import eu.nebulous.resource.discovery.registration.model.RegistrationRequestException;
+import eu.nebulous.resource.discovery.registration.service.SALRegistrationService;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
@@ -31,6 +32,7 @@ public class DeviceManagementController {
 	private final DeviceProcessor deviceProcessor;
 	private final DeviceManagementService deviceService;
 	private final DeviceLifeCycleRequestService deviceLifeCycleRequestService;
+	private final SALRegistrationService salRegistrationService;
 
 	private boolean isAuthenticated(Authentication authentication) {
 		return authentication!=null && StringUtils.isNotBlank(authentication.getName());
@@ -83,6 +85,8 @@ public class DeviceManagementController {
 
 	@PutMapping(value = "/device", consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE)
 	public Device createDevice(@RequestBody Device device) {
+
+		salRegistrationService.register(device);
 		return deviceService.save(device);
 	}
 
diff --git a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/monitor/model/Device.java b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/monitor/model/Device.java
index 7aa5ded..dc41182 100644
--- a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/monitor/model/Device.java
+++ b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/monitor/model/Device.java
@@ -21,6 +21,7 @@ public class Device {
     private String name;
     private String owner;
     private String ipAddress;
+    private int port;
     private DeviceLocation location;
     private String username;
     @ToString.Exclude
@@ -40,6 +41,7 @@ public class Device {
 
     private String nodeReference;
     @Setter(AccessLevel.NONE)
+    @Builder.Default
     private List<String> messages = new ArrayList<>();
 
     private DeviceStatusUpdate statusUpdate;
diff --git a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/monitor/service/DeviceManagementService.java b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/monitor/service/DeviceManagementService.java
index 8bf7802..bcf4a67 100644
--- a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/monitor/service/DeviceManagementService.java
+++ b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/monitor/service/DeviceManagementService.java
@@ -192,6 +192,7 @@ public class DeviceManagementService {
 		result.get().setArchiveDate(Instant.now());
 		archivedDeviceRepository.save(deviceConversionService.toArchivedDevice(result.get()));
 		deviceRepository.delete(result.get());
+		//XXX:TODO: Send notification to SAL to deregister Device
 	}
 
 	public void unarchiveDevice(String id, Map<String,String> credentials) {
@@ -208,6 +209,7 @@ public class DeviceManagementService {
 		restoredDevice.setPublicKey(credentials.get("publicKey").toCharArray());
 		deviceRepository.save(restoredDevice);
 		archivedDeviceRepository.deleteById(result.get().getId());
+		//XXX:TODO: Send notification to SAL to re-register Device
 	}
 
 	private void checkCredentials(String id, Map<String, String> credentials) {
diff --git a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/monitor/service/UnknownDeviceRegistrationService.java b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/monitor/service/UnknownDeviceRegistrationService.java
index 7bb4fc1..290fa3b 100644
--- a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/monitor/service/UnknownDeviceRegistrationService.java
+++ b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/monitor/service/UnknownDeviceRegistrationService.java
@@ -9,18 +9,19 @@ import eu.nebulous.resource.discovery.monitor.model.Device;
 import eu.nebulous.resource.discovery.monitor.model.DeviceStatus;
 import eu.nebulous.resource.discovery.registration.model.RegistrationRequest;
 import eu.nebulous.resource.discovery.registration.service.RegistrationRequestService;
+import eu.nebulous.resource.discovery.registration.service.SALRegistrationService;
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.springframework.scheduling.TaskScheduler;
-import org.springframework.stereotype.Service;
+//import org.springframework.stereotype.Service;
 
 import java.time.Duration;
 import java.time.Instant;
 import java.util.*;
 
 @Slf4j
-@Service
+//@Service
 public class UnknownDeviceRegistrationService extends AbstractMonitorService {
     private final static List<String> MONITORED_REQUEST_TYPES = List.of(
             REQUEST_TYPE.INFO.name(),
@@ -29,16 +30,18 @@ public class UnknownDeviceRegistrationService extends AbstractMonitorService {
     );
     private final RegistrationRequestService registrationRequestService;
     private final DeviceManagementService deviceManagementService;
+    private final SALRegistrationService salRegistrationService;
     private final Map<String, String> detectedDevices = Collections.synchronizedMap(new LinkedHashMap<>());
     private final List<Map> deviceDetailsQueue = Collections.synchronizedList(new LinkedList<>());
 
     public UnknownDeviceRegistrationService(ResourceDiscoveryProperties monitorProperties, TaskScheduler taskScheduler,
                                             ObjectMapper objectMapper, DeviceManagementService deviceManagementService,
-                                            RegistrationRequestService registrationRequestService, BrokerUtil brokerUtil)
+                                            RegistrationRequestService registrationRequestService, BrokerUtil brokerUtil, SALRegistrationService salRegistrationService)
     {
         super("UnknownDeviceRegistrationService", monitorProperties, taskScheduler, objectMapper, brokerUtil);
         this.registrationRequestService = registrationRequestService;
         this.deviceManagementService = deviceManagementService;
+        this.salRegistrationService = salRegistrationService;
     }
 
     @Override
@@ -243,6 +246,11 @@ public class UnknownDeviceRegistrationService extends AbstractMonitorService {
                         .build();
                 newDevice = deviceManagementService.save(newDevice);
                 log.info("UnknownDeviceRegistrationService: Registered device: {}", newDevice);
+
+                log.info("Registering the device {} to SAL...",newDevice);
+                salRegistrationService.register(newDevice);
+
+
             } catch (Exception e) {
                 log.warn("UnknownDeviceRegistrationService: EXCEPTION while processing device details response: {}\nException: ", map, e);
             }
diff --git a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/registration/RegistrationRequestProcessor.java b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/registration/RegistrationRequestProcessor.java
index f8871f7..ce40a47 100644
--- a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/registration/RegistrationRequestProcessor.java
+++ b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/registration/RegistrationRequestProcessor.java
@@ -9,6 +9,7 @@ import eu.nebulous.resource.discovery.monitor.service.DeviceManagementService;
 import eu.nebulous.resource.discovery.registration.model.RegistrationRequest;
 import eu.nebulous.resource.discovery.registration.model.RegistrationRequestStatus;
 import eu.nebulous.resource.discovery.registration.service.RegistrationRequestService;
+import eu.nebulous.resource.discovery.registration.service.SALRegistrationService;
 import lombok.NonNull;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
@@ -50,6 +51,7 @@ public class RegistrationRequestProcessor implements IRegistrationRequestProcess
 	private final ResourceDiscoveryProperties processorProperties;
 	private final RegistrationRequestService registrationRequestService;
 	private final DeviceManagementService deviceManagementService;
+	private final SALRegistrationService salRegistrationService;
 	private final TaskScheduler taskScheduler;
 	private final ObjectMapper objectMapper;
 	private final BrokerUtil brokerUtil;
@@ -177,6 +179,7 @@ public class RegistrationRequestProcessor implements IRegistrationRequestProcess
 					"deviceOs", registrationRequest.getDevice().getOs(),
 					"deviceName", registrationRequest.getDevice().getName(),
 					"deviceIpAddress", registrationRequest.getDevice().getIpAddress(),
+					"devicePort", Integer.toString( registrationRequest.getDevice().getPort() ),
 					"deviceUsername", registrationRequest.getDevice().getUsername(),
 					"devicePassword", new String(registrationRequest.getDevice().getPassword()),
 					"devicePublicKey", new String(registrationRequest.getDevice().getPublicKey())
@@ -257,7 +260,7 @@ public class RegistrationRequestProcessor implements IRegistrationRequestProcess
 			String ipAddress = registrationRequest.getDevice().getIpAddress();
 			boolean isError = false;
 			if (StringUtils.isNotBlank(deviceIpAddress) && ! StringUtils.equals(ipAddress, deviceIpAddress)) {
-				String mesg = String.format("Device IP address in RESPONSE does not match with that in the request: id=%s, ip-address-response=%s != ip-address-in-request%s", requestId, deviceIpAddress, ipAddress);
+				String mesg = String.format("Device IP address in RESPONSE does not match with that in the request: id=%s, ip-address-response=%s != ip-address-in-request=%s", requestId, deviceIpAddress, ipAddress);
 				log.warn("processResponse: {}", mesg);
 				registrationRequest.getMessages().add(mesg);
 				isError = true;
@@ -367,5 +370,6 @@ public class RegistrationRequestProcessor implements IRegistrationRequestProcess
 		device.setRequestId(registrationRequest.getId());
 		device.setNodeReference(registrationRequest.getNodeReference());
 		deviceManagementService.save(device);
+		salRegistrationService.register(device);
 	}
 }
diff --git a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/registration/model/Device.java b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/registration/model/Device.java
index cdf04fa..34335b9 100644
--- a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/registration/model/Device.java
+++ b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/registration/model/Device.java
@@ -18,6 +18,7 @@ public class Device {
     private String name;
     private String owner;
     private String ipAddress;
+    private int port;
     private DeviceLocation location;
     private String username;
     @ToString.Exclude
diff --git a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/registration/model/RegistrationRequest.java b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/registration/model/RegistrationRequest.java
index 210cf10..664c741 100644
--- a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/registration/model/RegistrationRequest.java
+++ b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/registration/model/RegistrationRequest.java
@@ -1,9 +1,6 @@
 package eu.nebulous.resource.discovery.registration.model;
 
-import lombok.AccessLevel;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-import lombok.Setter;
+import lombok.*;
 import lombok.experimental.SuperBuilder;
 import org.springframework.data.annotation.Id;
 import org.springframework.data.mongodb.core.mapping.Document;
@@ -25,9 +22,11 @@ public class RegistrationRequest {
     private Instant lastUpdateDate;
     private Instant archiveDate;
     private RegistrationRequestStatus status;
+    @Builder.Default
     private List<RegistrationRequestHistoryEntry> history = new ArrayList<>();
     private String nodeReference;
     @Setter(AccessLevel.NONE)
+    @Builder.Default
     private List<String> messages = new ArrayList<>();
 
     // Required in order BeanUtils.copyProperties() to also copy this
diff --git a/resource-discovery/src/main/java/eu/nebulous/resource/discovery/registration/service/SALRegistrationService.java b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/registration/service/SALRegistrationService.java
new file mode 100644
index 0000000..1e0d08d
--- /dev/null
+++ b/resource-discovery/src/main/java/eu/nebulous/resource/discovery/registration/service/SALRegistrationService.java
@@ -0,0 +1,117 @@
+package eu.nebulous.resource.discovery.registration.service;
+
+import eu.nebulous.resource.discovery.ResourceDiscoveryProperties;
+import eu.nebulous.resource.discovery.broker_communication.SynchronousBrokerPublisher;
+import eu.nebulous.resource.discovery.monitor.model.Device;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.json.simple.JSONObject;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Map;
+
+import static eu.nebulous.resource.discovery.broker_communication.SALCommunicator.*;
+
+@Slf4j
+@Service
+public class SALRegistrationService implements InitializingBean {
+    @Autowired
+    private final ResourceDiscoveryProperties processorProperties;
+
+    public SALRegistrationService(ResourceDiscoveryProperties processorProperties) {
+        this.processorProperties = processorProperties;
+    }
+
+    public void register(Device device) {
+
+        String  application_name = "default-application"; //TODO decide on this
+        Map<String,String> device_info = device.getDeviceInfo();
+        /* Information available from the EMS, based on https://gitlab.com/nebulous-project/ems-main/-/blob/master/ems-core/bin/detect.sh?ref_type=heads
+        echo CPU_SOCKETS=$TMP_NUM_CPUS
+        echo CPU_CORES=$TMP_NUM_CORES
+        echo CPU_PROCESSORS=$TMP_NUM_PROCESSORS
+        echo RAM_TOTAL_KB=$TMP_RAM_TOTAL_KB
+        echo RAM_AVAILABLE_KB=$TMP_RAM_AVAILABLE_KB
+        echo RAM_FREE_KB=$TMP_RAM_FREE_KB
+        echo RAM_USED_KB=$TMP_RAM_USED_KB
+        echo RAM_UTILIZATION=$TMP_RAM_UTILIZATION
+        echo DISK_TOTAL_KB=$TMP_DISK_TOTAL_KB
+        echo DISK_FREE_KB=$TMP_DISK_FREE_KB
+        echo DISK_USED_KB=$TMP_DISK_USED_KB
+        echo DISK_UTILIZATION=$TMP_DISK_UTILIZATION
+        echo OS_ARCHITECTURE=$TMP_ARCHITECTURE
+        echo OS_KERNEL=$TMP_KERNEL
+        echo OS_KERNEL_RELEASE=$TMP_KERNEL_RELEASE
+         */
+        String device_name = device.getName();
+
+        Integer cores = Integer.parseInt(device_info.get("CPU_PROCESSORS"));
+        Integer ram_gb = Integer.parseInt(device_info.get("RAM_TOTAL_KB"))/1000000;
+        Integer disk_gb = Integer.parseInt(device_info.get("DISK_TOTAL_KB"))/1000000;
+        String external_ip_address = device.getIpAddress();
+        String device_username = device.getUsername();
+        String device_password = new String(device.getPassword());
+        String device_pub_key = new String(device.getPublicKey()); //TODO get here private key instead and pass this to device registration
+        //TODO implement provider here: String provider = device.getProvider();
+        //String network_rx =device_info.get("RX");
+        //String network_tx = device_info.get("TX");
+
+        Clock clock = Clock.systemUTC();
+
+        //JSONObject register_device_message = new JSONObject();
+        //register_device_message.put("device_name",device_name);
+        //register_device_message.put("timestamp",(int)(clock.millis()/1000));
+
+        String register_device_message_string = get_device_registration_json("10.100.100",external_ip_address,cores,ram_gb,disk_gb,device_name,"test_provider","Athens","Greece", device_username, device_password);
+        log.error("topic is "+get_registration_topic_name(application_name));
+        log.error("broker ip is "+processorProperties.getNebulous_broker_ip_address());
+        log.error("broker port is "+processorProperties.getNebulous_broker_port());
+        log.error("username is "+processorProperties.getNebulous_broker_username());
+        log.error("password is "+processorProperties.getNebulous_broker_password());
+        //String sal_running_applications_reply = request_running_applications_AMQP();
+        //ArrayList<String> applications = get_running_applications(sal_running_applications_reply);
+        //for (String application_name:applications) {
+                SynchronousBrokerPublisher register_device_publisher = new SynchronousBrokerPublisher(get_registration_topic_name(application_name), processorProperties.getNebulous_broker_ip_address(),processorProperties.getNebulous_broker_port(), processorProperties.getNebulous_broker_username(), processorProperties.getNebulous_broker_password(), "");
+                //TODO handle the response here
+                Map response = register_device_publisher.publish_for_response(register_device_message_string, Collections.singleton(application_name));
+                log.info("The response received while trying to register device " + device_name);
+        //}
+
+        /* This is some realtime information, could be retrieved with a different call to the EMS.
+        CurrDateTime: 1709207141
+        UpDateTime: 1709186638
+        Uptime: 20503
+        CPU: 0
+        RAM: 31.4725
+        DISK: 10.3586
+        RX: 0
+        TX: 0
+         */
+
+    }
+
+    private String get_registration_topic_name(String application_name) {
+        return "eu.nebulouscloud.exn.sal.node.create";
+        //return ("eu.nebulouscloud.exn.sal.edge." + application_name);
+    }
+
+    @Override
+    public void afterPropertiesSet() throws Exception {
+        if (    processorProperties.getNebulous_broker_password()!=null &&
+                processorProperties.getNebulous_broker_username()!=null &&
+                processorProperties.getNebulous_broker_ip_address()!=null
+        ){
+            log.info("Successful setting of properties for communication with SAL");
+        }else{
+            log.error("broker ip is "+processorProperties.getNebulous_broker_ip_address());
+            log.error("username is "+processorProperties.getNebulous_broker_username());
+            log.error("password is "+processorProperties.getNebulous_broker_password());
+            throw new Exception("Required data is null - broker ip is "+processorProperties.getNebulous_broker_ip_address()+" username is "+processorProperties.getNebulous_broker_username()+" password is "+processorProperties.getNebulous_broker_password());
+        }
+    }
+}
diff --git a/resource-discovery/src/main/resources/application.yml b/resource-discovery/src/main/resources/application.yml
index ec64793..74ae20f 100644
--- a/resource-discovery/src/main/resources/application.yml
+++ b/resource-discovery/src/main/resources/application.yml
@@ -20,6 +20,13 @@ discovery:
   brokerURL: "ssl://localhost:61617?daemon=true&trace=false&useInactivityMonitor=false&connectionTimeout=0&keepAlive=true"
   brokerUsername: "aaa"
   brokerPassword: "111"
+  nebulous_broker_ip_address: "nebulous-activemq"
+  nebulous_broker_port: 5672
+  nebulous_broker_username: "admin"
+  nebulous_broker_password: "admin"
+  sal_host: "localhost"
+  sal_port: 8080
+  lost_device_topic: "eu.nebulouscloud.monitoring.device_lost"
   trustStoreFile: tests/config/broker-truststore.p12
   trustStorePassword: melodic
   trustStoreType: PKCS12
diff --git a/resource-discovery/src/main/resources/static/freebees_webdesign_6/archived-device-view.html b/resource-discovery/src/main/resources/static/freebees_webdesign_6/archived-device-view.html
index 6b4e34e..b21c45b 100644
--- a/resource-discovery/src/main/resources/static/freebees_webdesign_6/archived-device-view.html
+++ b/resource-discovery/src/main/resources/static/freebees_webdesign_6/archived-device-view.html
@@ -287,6 +287,13 @@ function flattenObject(ob) {
                                         <input type="text" readonly class="form-control-plaintext" id="device#ipAddress" value="" placeholder="Device IP address">
                                     </div>
                                 </div>
+                                <!-- Device Port -->
+                                <div class="form-group row">
+                                    <label for="device#port" class="col-sm-2 col-form-label"><b>SSH port</b></label>
+                                    <div class="col-sm-10">
+                                        <input type="text" readonly class="form-control-plaintext" id="device#port" value="" placeholder="Device SSH port">
+                                    </div>
+                                </div>
                                 <!-- Device Location -->
                                 <div class="form-group row">
                                     <label for="device#location#name" class="col-sm-2 col-form-label"><b>Location</b></label>
diff --git a/resource-discovery/src/main/resources/static/freebees_webdesign_6/archived-request-view.html b/resource-discovery/src/main/resources/static/freebees_webdesign_6/archived-request-view.html
index 16d70ba..ea9e61d 100644
--- a/resource-discovery/src/main/resources/static/freebees_webdesign_6/archived-request-view.html
+++ b/resource-discovery/src/main/resources/static/freebees_webdesign_6/archived-request-view.html
@@ -300,6 +300,13 @@ function flattenObject(ob) {
                                 <input type="text" readonly class="form-control" id="request#device#ipAddress" value="" placeholder="Device IP address">
                             </div>
                         </div>
+                        <!-- Device Port -->
+                        <div class="form-group row">
+                            <label for="request#device#port" class="col-sm-2 col-form-label"><b>SSH port</b></label>
+                            <div class="col-sm-10">
+                                <input type="text" readonly class="form-control" id="request#device#port" value="" placeholder="Device SSH port">
+                            </div>
+                        </div>
                         <!-- Device Location -->
                         <div class="form-group row">
                             <label for="request#device#location#name" class="col-sm-2 col-form-label"><b>Location</b></label>
diff --git a/resource-discovery/src/main/resources/static/freebees_webdesign_6/device-view.html b/resource-discovery/src/main/resources/static/freebees_webdesign_6/device-view.html
index 37b9a9c..c39924e 100644
--- a/resource-discovery/src/main/resources/static/freebees_webdesign_6/device-view.html
+++ b/resource-discovery/src/main/resources/static/freebees_webdesign_6/device-view.html
@@ -404,6 +404,13 @@ function sendDeviceData(deviceData) {
                                         <input type="text" readonly class="form-control-plaintext" id="device#ipAddress" value="" placeholder="Device IP address">
                                     </div>
                                 </div>
+                                <!-- Device Port -->
+                                <div class="form-group row">
+                                    <label for="device#port" class="col-sm-2 col-form-label"><b>SSH port</b></label>
+                                    <div class="col-sm-10">
+                                        <input type="text" readonly class="form-control-plaintext" id="device#port" value="" placeholder="Device SSH port">
+                                    </div>
+                                </div>
                                 <!-- Device Location -->
                                 <div class="form-group row">
                                     <label for="device#location#name" class="col-sm-2 col-form-label"><b>Location</b></label>
diff --git a/resource-discovery/src/main/resources/static/freebees_webdesign_6/request-edit.html b/resource-discovery/src/main/resources/static/freebees_webdesign_6/request-edit.html
index 05dd881..20266ce 100644
--- a/resource-discovery/src/main/resources/static/freebees_webdesign_6/request-edit.html
+++ b/resource-discovery/src/main/resources/static/freebees_webdesign_6/request-edit.html
@@ -458,6 +458,13 @@ function sendRequestData(requestData) {
                                 <input type="text" class="form-control" id="request#device#ipAddress" value="" placeholder="Device IP address">
                             </div>
                         </div>
+                        <!-- Device Port -->
+                        <div class="form-group row">
+                            <label for="request#device#port" class="col-sm-2 col-form-label"><b>SSH port</b></label>
+                            <div class="col-sm-10">
+                                <input type="text" class="form-control" id="request#device#port" value="" placeholder="Device SSH port">
+                            </div>
+                        </div>
                         <!-- Device Location -->
                         <div class="form-group row">
                             <label for="request#device#location#name" class="col-sm-2 col-form-label"><b>Location</b></label>