Minor fix

Correction in configuration file and correction in the way subscription is being performed for each application

Change-Id: I69f6e4a45a3e0e044a0fb0b741d9776262949504
This commit is contained in:
Andreas Tsagkaropoulos 2024-04-25 17:26:51 +03:00
parent 3d111f38e7
commit d759508426
2 changed files with 19 additions and 11 deletions

View File

@ -36,25 +36,27 @@ class ConsumerHandler(Handler):
class GenericConsumerHandler(Handler): class GenericConsumerHandler(Handler):
connector_thread = None connector_thread = None
initialized_connector = None #initialized_connector = None
application_consumer_handler_connectors = {} #dictionary in which keys are applications and values are the consumer handlers. application_consumer_handler_connectors = {} #dictionary in which keys are applications and values are the consumer handlers.
application_threads = {}
def GenericConsumerHandler(self): def GenericConsumerHandler(self):
if self.connector_thread is not None: pass
self.initialized_connector.stop() #if self.connector_thread is not None:
#self.initialized_connector.stop()
def on_message(self, key, address, body, context, **kwargs): def on_message(self, key, address, body, context, **kwargs):
if (str(address)).startswith(Constants.monitoring_prefix+Constants.metric_list_topic): if (str(address)).startswith(Constants.monitoring_prefix+Constants.metric_list_topic):
application_name = body["name"] application_name = body["name"]
logging.info("New metrics list message for application "+application_name + " - registering new connector") logging.info("New metrics list message for application "+application_name + " - registering new connector")
if (application_name in self.application_consumer_handler_connectors.keys() is not None): if (application_name in self.application_consumer_handler_connectors.keys()):
logging.info("Stopping the old existing connector...") logging.info("Stopping the old existing connector...")
self.application_consumer_handler_connectors[application_name].stop() self.application_consumer_handler_connectors[application_name].stop()
logging.info("Attempting to register new connector...") logging.info("Attempting to register new connector...")
self.initialized_connector = exn.connector.EXN( self.application_consumer_handler_connectors[application_name] = exn.connector.EXN(
Constants.data_persistor_name + "-" + application_name, handler=Bootstrap(), Constants.data_persistor_name + "-" + application_name, handler=Bootstrap(),
consumers=[ consumers=[
core.consumer.Consumer('monitoring', core.consumer.Consumer('monitoring-'+application_name,
Constants.monitoring_broker_topic + '.realtime.>', Constants.monitoring_broker_topic + '.realtime.>',
application=application_name, application=application_name,
topic=True, topic=True,
@ -69,12 +71,17 @@ class GenericConsumerHandler(Handler):
) )
logging.info("Connector ready to be registered") logging.info("Connector ready to be registered")
#connector.start() #connector.start()
self.application_consumer_handler_connectors[application_name] = self.initialized_connector #self.application_consumer_handler_connectors[application_name] = self.initialized_connector
#self.application_consumer_handler_connectors[application_name].start()
logging.info(f"Application specific connector registered for application {application_name}") logging.info(f"Application specific connector registered for application {application_name}")
self.initialized_connector.start() #self.initialized_connector.start()
logging.info(f"Application specific connector started for application {application_name}")
#If threading support is explicitly required, uncomment these lines #If threading support is explicitly required, uncomment these lines
#connector_thread = threading.Thread(target=self.initialized_connector.start,args=()) self.application_threads[application_name] = threading.Thread(target=self.application_consumer_handler_connectors[application_name].start,args=())
self.application_threads[application_name].start()
logging.info(f"Application specific connector started for application {application_name}")
self.application_threads[application_name].join()
#connector_thread = threading.Thread(target=self.application_consumer_handler_connectors[application_name].start,args=())
#connector_thread.start() #connector_thread.start()
#connector_thread.join() #connector_thread.join()
@ -106,7 +113,7 @@ def main():
port=Constants.broker_port, port=Constants.broker_port,
username=Constants.broker_username, username=Constants.broker_username,
password=Constants.broker_password password=Constants.broker_password
) )
#connector.start() #connector.start()
thread = threading.Thread(target=connector_instance.start,args=()) thread = threading.Thread(target=connector_instance.start,args=())
thread.start() thread.start()

View File

@ -2,6 +2,7 @@ broker_ip=nebulous-activemq
broker_port=5672 broker_port=5672
broker_username=admin broker_username=admin
broker_password=admin broker_password=admin
influxdb_organization_name = my-org
influxdb_hostname=nebulous-influxdb influxdb_hostname=nebulous-influxdb
influxdb_username=my-user influxdb_username=my-user
influxdb_password=my-password influxdb_password=my-password