From d759508426a8fb184a18b6fbfffe2cfd8e301469 Mon Sep 17 00:00:00 2001
From: Andreas Tsagkaropoulos <atsagkaropoulos@mail.ntua.gr>
Date: Thu, 25 Apr 2024 17:26:51 +0300
Subject: [PATCH] Minor fix

Correction in configuration file and correction in the way subscription is being performed for each application

Change-Id: I69f6e4a45a3e0e044a0fb0b741d9776262949504
---
 .../src/main/runtime/DataPersistor.py         | 29 ++++++++++++-------
 .../src/resources/config.properties           |  1 +
 2 files changed, 19 insertions(+), 11 deletions(-)

diff --git a/monitoring-data-persistor/src/main/runtime/DataPersistor.py b/monitoring-data-persistor/src/main/runtime/DataPersistor.py
index ec348df..651c298 100644
--- a/monitoring-data-persistor/src/main/runtime/DataPersistor.py
+++ b/monitoring-data-persistor/src/main/runtime/DataPersistor.py
@@ -36,25 +36,27 @@ class ConsumerHandler(Handler):
 
 class GenericConsumerHandler(Handler):
     connector_thread = None
-    initialized_connector = None
+    #initialized_connector = None
     application_consumer_handler_connectors = {} #dictionary in which keys are applications and values are the consumer handlers.
+    application_threads = {}
 
     def GenericConsumerHandler(self):
-        if self.connector_thread is not None:
-            self.initialized_connector.stop()
+        pass
+        #if self.connector_thread is not None:
+            #self.initialized_connector.stop()
     def on_message(self, key, address, body, context, **kwargs):
 
         if (str(address)).startswith(Constants.monitoring_prefix+Constants.metric_list_topic):
             application_name = body["name"]
             logging.info("New metrics list message for application "+application_name + " - registering new connector")
-            if (application_name in self.application_consumer_handler_connectors.keys()  is not None):
+            if (application_name in self.application_consumer_handler_connectors.keys()):
                 logging.info("Stopping the old existing connector...")
                 self.application_consumer_handler_connectors[application_name].stop()
             logging.info("Attempting to register new connector...")
-            self.initialized_connector = exn.connector.EXN(
+            self.application_consumer_handler_connectors[application_name] = exn.connector.EXN(
                 Constants.data_persistor_name + "-" + application_name, handler=Bootstrap(),
                 consumers=[
-                    core.consumer.Consumer('monitoring',
+                    core.consumer.Consumer('monitoring-'+application_name,
                         Constants.monitoring_broker_topic + '.realtime.>',
                         application=application_name,
                         topic=True,
@@ -69,12 +71,17 @@ class GenericConsumerHandler(Handler):
             )
             logging.info("Connector ready to be registered")
             #connector.start()
-            self.application_consumer_handler_connectors[application_name] = self.initialized_connector
+            #self.application_consumer_handler_connectors[application_name] = self.initialized_connector
+            #self.application_consumer_handler_connectors[application_name].start()
             logging.info(f"Application specific connector registered for application {application_name}")
-            self.initialized_connector.start()
-            logging.info(f"Application specific connector started for application {application_name}")
+            #self.initialized_connector.start()
+
         #If threading support is explicitly required, uncomment these lines
-            #connector_thread = threading.Thread(target=self.initialized_connector.start,args=())
+            self.application_threads[application_name] = threading.Thread(target=self.application_consumer_handler_connectors[application_name].start,args=())
+            self.application_threads[application_name].start()
+            logging.info(f"Application specific connector started for application {application_name}")
+            self.application_threads[application_name].join()
+            #connector_thread = threading.Thread(target=self.application_consumer_handler_connectors[application_name].start,args=())
             #connector_thread.start()
             #connector_thread.join()
 
@@ -106,7 +113,7 @@ def main():
                                        port=Constants.broker_port,
                                        username=Constants.broker_username,
                                        password=Constants.broker_password
-                                       )
+                              )
     #connector.start()
     thread = threading.Thread(target=connector_instance.start,args=())
     thread.start()
diff --git a/monitoring-data-persistor/src/resources/config.properties b/monitoring-data-persistor/src/resources/config.properties
index fd8fb69..f2c9045 100644
--- a/monitoring-data-persistor/src/resources/config.properties
+++ b/monitoring-data-persistor/src/resources/config.properties
@@ -2,6 +2,7 @@ broker_ip=nebulous-activemq
 broker_port=5672
 broker_username=admin
 broker_password=admin
+influxdb_organization_name = my-org
 influxdb_hostname=nebulous-influxdb
 influxdb_username=my-user
 influxdb_password=my-password