diff --git a/exn-connector/src/main/examples/eu/nebulouscloud/exn/TestPublisher.groovy b/exn-connector/src/main/examples/eu/nebulouscloud/exn/TestPublisher.groovy index d012f3c..e93e975 100644 --- a/exn-connector/src/main/examples/eu/nebulouscloud/exn/TestPublisher.groovy +++ b/exn-connector/src/main/examples/eu/nebulouscloud/exn/TestPublisher.groovy @@ -33,7 +33,7 @@ class MyConnectorHandler extends ConnectorHandler { @Override - def onReady(AtomicReference context) { + def void onReady(AtomicReference context) { println ("Ready") diff --git a/exn-connector/src/main/examples/eu/nebulouscloud/exn/TestReceiver.groovy b/exn-connector/src/main/examples/eu/nebulouscloud/exn/TestReceiver.groovy index 04d070f..1df9190 100644 --- a/exn-connector/src/main/examples/eu/nebulouscloud/exn/TestReceiver.groovy +++ b/exn-connector/src/main/examples/eu/nebulouscloud/exn/TestReceiver.groovy @@ -6,22 +6,21 @@ import eu.nebulouscloud.exn.handlers.ConnectorHandler import eu.nebulouscloud.exn.settings.StaticExnConfig import org.apache.qpid.protonj2.client.Message import org.apache.qpid.protonj2.client.exceptions.ClientException -import org.slf4j.Logger -import org.slf4j.LoggerFactory + import java.util.concurrent.atomic.AtomicReference class MyConnectorHandler extends ConnectorHandler { @Override - def onReady(AtomicReference context) { + def void onReady(AtomicReference context) { println ("Ready start working") } } class MyCustomConsumerHandler extends Handler{ @Override - def onMessage(String key, String address, Map body, Message message, AtomicReference context) { + def void onMessage(String key, String address, Map body, Message message, AtomicReference context) { println "Received by custom handler ${key} => ${address} = ${body}" } } @@ -38,7 +37,7 @@ public static void main(String[] args) { new Consumer("ui_health","health", new MyCustomConsumerHandler(), true), new Consumer("ui_all","eu.nebulouscloud.ui.preferences.>", new Handler(){ @Override - def Object onMessage(String key, String address, Map body, Message rawMessage, AtomicReference context) { + def void onMessage(String key, String address, Map body, Message rawMessage, AtomicReference context) { if(key == "ui_all"){ println "These are my preferences => ${body}" } @@ -46,7 +45,7 @@ public static void main(String[] args) { },true,true), ], new StaticExnConfig( - 'localhosts', + 'localhost', 5672, "admin", "admin" diff --git a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/Connector.groovy b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/Connector.groovy index e68a3ab..fefdf72 100644 --- a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/Connector.groovy +++ b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/Connector.groovy @@ -57,15 +57,15 @@ public class Connector { ) ) - + List compiledPublishers = new ArrayList<>() if (enableState) { - publishers.add( + compiledPublishers.add( new StatePublisher() ) } if (enableHealth) { - publishers.add( + compiledPublishers.add( new SchedulePublisher( this.config.healthTimeout(), 'health', @@ -75,7 +75,8 @@ public class Connector { ) ) } - this.publishers = publishers + compiledPublishers.addAll(publishers) + this.publishers = compiledPublishers this.executorService = Executors.newCachedThreadPool(); } @@ -130,12 +131,15 @@ public class Connector { connectionOpts.reconnectEnabled(true); this.connection = client.connect(config.url(), config.port(), connectionOpts) for (Publisher p : publishers) { + String address = this.context.get().buildAddressFromLink(p) p.setLink(address,connection.openSender(address)) logger.debug("Registering publisher {}", p) this.context.get().registerPublisher(p) if (p instanceof SchedulePublisher){ + logger.debug("Adding scheduled publisher as scheduled publisher {}", p) + final Publisher threadPublisher = p; this.executorService.submit( new Runnable() { @Override @@ -143,12 +147,12 @@ public class Connector { boolean healthy = true while(healthy && running.get()){ try{ - logger.debug("Processing scheduled executor [{}] {} ",p.key, address) - p.send() - logger.debug("\t waiting for {} = {} ",address,p.delay) - Thread.sleep(p.delay*1000) + logger.debug("Processing scheduled executor [{}] {} ", threadPublisher.key, address) + threadPublisher.send() + logger.debug("\t waiting for {} = {} ",address, threadPublisher.delay) + Thread.sleep(threadPublisher.delay*1000) }catch (Exception e){ - logger.error("Error processing scheduled executor [{}] - disabling", p.key,e) + logger.error("Error processing scheduled executor [{}] - disabling", threadPublisher.key,e) healthy=false } } diff --git a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Context.groovy b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Context.groovy index c23b2f3..66f6595 100644 --- a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Context.groovy +++ b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Context.groovy @@ -30,7 +30,7 @@ class Context { } void registerConsumers(consumer) { - consumers[publisher.key()] = consumer + consumers[consumer.key()] = consumer } String buildAddressFromLink(Link link) { diff --git a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Handler.groovy b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Handler.groovy index 466434c..a47d2bb 100644 --- a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Handler.groovy +++ b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Handler.groovy @@ -9,7 +9,7 @@ import java.util.concurrent.atomic.AtomicReference abstract class Handler { private static final Logger logger = LoggerFactory.getLogger(Consumer.class) - public onMessage(String key, String address, Map body, Message message, AtomicReference context){ + public void onMessage(String key, String address, Map body, Message message, AtomicReference context){ logger.debug("Default on message for delivery for {} => {} ({}) = {}", key, address, diff --git a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/handlers/ConnectorHandler.groovy b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/handlers/ConnectorHandler.groovy index a52c2bc..cde5678 100644 --- a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/handlers/ConnectorHandler.groovy +++ b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/handlers/ConnectorHandler.groovy @@ -17,7 +17,7 @@ abstract class ConnectorHandler { } - public onReady(AtomicReference context){ + public void onReady(AtomicReference context){ }