From ebf51c32356891ec14be1fcd001723c664c70205 Mon Sep 17 00:00:00 2001 From: Fotis Paraskevopoulos Date: Wed, 29 Nov 2023 12:10:18 +0100 Subject: [PATCH] Correcting an invalid look variable which was not thread safe. Fixing handling of lists in constructors Change-Id: I491eb7bc8082a6c877b974af72947e45e24eae46 --- .../eu/nebulouscloud/exn/TestPublisher.groovy | 2 +- .../eu/nebulouscloud/exn/TestReceiver.groovy | 11 +++++----- .../eu/nebulouscloud/exn/Connector.groovy | 22 +++++++++++-------- .../eu/nebulouscloud/exn/core/Context.groovy | 2 +- .../eu/nebulouscloud/exn/core/Handler.groovy | 2 +- .../exn/handlers/ConnectorHandler.groovy | 2 +- 6 files changed, 22 insertions(+), 19 deletions(-) diff --git a/exn-connector/src/main/examples/eu/nebulouscloud/exn/TestPublisher.groovy b/exn-connector/src/main/examples/eu/nebulouscloud/exn/TestPublisher.groovy index d012f3c..e93e975 100644 --- a/exn-connector/src/main/examples/eu/nebulouscloud/exn/TestPublisher.groovy +++ b/exn-connector/src/main/examples/eu/nebulouscloud/exn/TestPublisher.groovy @@ -33,7 +33,7 @@ class MyConnectorHandler extends ConnectorHandler { @Override - def onReady(AtomicReference context) { + def void onReady(AtomicReference context) { println ("Ready") diff --git a/exn-connector/src/main/examples/eu/nebulouscloud/exn/TestReceiver.groovy b/exn-connector/src/main/examples/eu/nebulouscloud/exn/TestReceiver.groovy index 04d070f..1df9190 100644 --- a/exn-connector/src/main/examples/eu/nebulouscloud/exn/TestReceiver.groovy +++ b/exn-connector/src/main/examples/eu/nebulouscloud/exn/TestReceiver.groovy @@ -6,22 +6,21 @@ import eu.nebulouscloud.exn.handlers.ConnectorHandler import eu.nebulouscloud.exn.settings.StaticExnConfig import org.apache.qpid.protonj2.client.Message import org.apache.qpid.protonj2.client.exceptions.ClientException -import org.slf4j.Logger -import org.slf4j.LoggerFactory + import java.util.concurrent.atomic.AtomicReference class MyConnectorHandler extends ConnectorHandler { @Override - def onReady(AtomicReference context) { + def void onReady(AtomicReference context) { println ("Ready start working") } } class MyCustomConsumerHandler extends Handler{ @Override - def onMessage(String key, String address, Map body, Message message, AtomicReference context) { + def void onMessage(String key, String address, Map body, Message message, AtomicReference context) { println "Received by custom handler ${key} => ${address} = ${body}" } } @@ -38,7 +37,7 @@ public static void main(String[] args) { new Consumer("ui_health","health", new MyCustomConsumerHandler(), true), new Consumer("ui_all","eu.nebulouscloud.ui.preferences.>", new Handler(){ @Override - def Object onMessage(String key, String address, Map body, Message rawMessage, AtomicReference context) { + def void onMessage(String key, String address, Map body, Message rawMessage, AtomicReference context) { if(key == "ui_all"){ println "These are my preferences => ${body}" } @@ -46,7 +45,7 @@ public static void main(String[] args) { },true,true), ], new StaticExnConfig( - 'localhosts', + 'localhost', 5672, "admin", "admin" diff --git a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/Connector.groovy b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/Connector.groovy index e68a3ab..fefdf72 100644 --- a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/Connector.groovy +++ b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/Connector.groovy @@ -57,15 +57,15 @@ public class Connector { ) ) - + List compiledPublishers = new ArrayList<>() if (enableState) { - publishers.add( + compiledPublishers.add( new StatePublisher() ) } if (enableHealth) { - publishers.add( + compiledPublishers.add( new SchedulePublisher( this.config.healthTimeout(), 'health', @@ -75,7 +75,8 @@ public class Connector { ) ) } - this.publishers = publishers + compiledPublishers.addAll(publishers) + this.publishers = compiledPublishers this.executorService = Executors.newCachedThreadPool(); } @@ -130,12 +131,15 @@ public class Connector { connectionOpts.reconnectEnabled(true); this.connection = client.connect(config.url(), config.port(), connectionOpts) for (Publisher p : publishers) { + String address = this.context.get().buildAddressFromLink(p) p.setLink(address,connection.openSender(address)) logger.debug("Registering publisher {}", p) this.context.get().registerPublisher(p) if (p instanceof SchedulePublisher){ + logger.debug("Adding scheduled publisher as scheduled publisher {}", p) + final Publisher threadPublisher = p; this.executorService.submit( new Runnable() { @Override @@ -143,12 +147,12 @@ public class Connector { boolean healthy = true while(healthy && running.get()){ try{ - logger.debug("Processing scheduled executor [{}] {} ",p.key, address) - p.send() - logger.debug("\t waiting for {} = {} ",address,p.delay) - Thread.sleep(p.delay*1000) + logger.debug("Processing scheduled executor [{}] {} ", threadPublisher.key, address) + threadPublisher.send() + logger.debug("\t waiting for {} = {} ",address, threadPublisher.delay) + Thread.sleep(threadPublisher.delay*1000) }catch (Exception e){ - logger.error("Error processing scheduled executor [{}] - disabling", p.key,e) + logger.error("Error processing scheduled executor [{}] - disabling", threadPublisher.key,e) healthy=false } } diff --git a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Context.groovy b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Context.groovy index c23b2f3..66f6595 100644 --- a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Context.groovy +++ b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Context.groovy @@ -30,7 +30,7 @@ class Context { } void registerConsumers(consumer) { - consumers[publisher.key()] = consumer + consumers[consumer.key()] = consumer } String buildAddressFromLink(Link link) { diff --git a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Handler.groovy b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Handler.groovy index 466434c..a47d2bb 100644 --- a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Handler.groovy +++ b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Handler.groovy @@ -9,7 +9,7 @@ import java.util.concurrent.atomic.AtomicReference abstract class Handler { private static final Logger logger = LoggerFactory.getLogger(Consumer.class) - public onMessage(String key, String address, Map body, Message message, AtomicReference context){ + public void onMessage(String key, String address, Map body, Message message, AtomicReference context){ logger.debug("Default on message for delivery for {} => {} ({}) = {}", key, address, diff --git a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/handlers/ConnectorHandler.groovy b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/handlers/ConnectorHandler.groovy index a52c2bc..cde5678 100644 --- a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/handlers/ConnectorHandler.groovy +++ b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/handlers/ConnectorHandler.groovy @@ -17,7 +17,7 @@ abstract class ConnectorHandler { } - public onReady(AtomicReference context){ + public void onReady(AtomicReference context){ }