diff --git a/exn-connector/src/main/examples/eu/nebulouscloud/exn/TestImmediateStop.java b/exn-connector/src/main/examples/eu/nebulouscloud/exn/TestImmediateStop.java new file mode 100644 index 0000000..fd5d092 --- /dev/null +++ b/exn-connector/src/main/examples/eu/nebulouscloud/exn/TestImmediateStop.java @@ -0,0 +1,102 @@ +package eu.nebulouscloud.exn; + + +import eu.nebulouscloud.exn.core.*; +import eu.nebulouscloud.exn.handlers.ConnectorHandler; +import eu.nebulouscloud.exn.settings.StaticExnConfig; +import org.apache.qpid.protonj2.client.Message; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +class MyStopOnDemmandHandler extends ConnectorHandler { + + Logger logger = LoggerFactory.getLogger(MySyncedHandler.class); + + @Override + public void onReady(Context context) { + + + if (context.hasPublisher("synced")) { + + logger.debug("Sending synced"); + Map ret = ((SyncedPublisher) context.getPublisher("synced")).sendSync( + Map.of("message", "hello world"), + null, null, false + ); + logger.debug("Received synced " + ret); + + } + } +} + + +class TestImmediateStop { + + static Logger logger = LoggerFactory.getLogger(TestSyncedPublisher.class); + + public static void main(String[] args) { + try { + Connector c = new Connector( + "ui", + new MySyncedHandler(), + List.of(), + List.of( + new Consumer("synced_consumer", "synced", new Handler() { + @Override + public void onMessage(String key, String address, Map body, Message message, Context context) { + + logger.debug("Replying in 5 seconds to " + context.getPublisher("synced_reply").address()); + Executors.newSingleThreadScheduledExecutor().schedule( + new Runnable() { + @Override + public void run() { + try { + Map b = Map.of("correlation-id", message.correlationId()); + context.getPublisher("synced_reply").send( + Map.of("all", "good"), + null, + b + ); + } catch (Exception e) { + logger.error("Error replying", e); + } + + } + }, 5, TimeUnit.SECONDS + ); + + + } + }, true) + ), + false, + false, + new StaticExnConfig( + "localhost", + 5672, + "admin", + "admin" + ) + ); + c.start(); + + Executors.newSingleThreadScheduledExecutor().schedule(new Runnable() { + @Override + public void run() { + System.out.println("Force Stop" ); + c.stop(); + } + }, 2, TimeUnit.SECONDS); + + } catch (Exception e) { + e.printStackTrace(); + } + } +} + + diff --git a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/Connector.groovy b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/Connector.groovy index 7c0c6a1..432fd60 100644 --- a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/Connector.groovy +++ b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/Connector.groovy @@ -112,7 +112,6 @@ public class Connector { public void stop() { this.context.stop() - def executor = Executors.newSingleThreadScheduledExecutor() executor .schedule(new Runnable() { diff --git a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Context.groovy b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Context.groovy index 9aa3b0e..72ce889 100644 --- a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Context.groovy +++ b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Context.groovy @@ -139,18 +139,17 @@ class Context { void stop(){ - publishers.each {p -> { - p.setActive(false) - p.link.close() - }} - - consumers.each {p -> { - p.setActive(false) - p.link.close() - }} - manager.stop() + publishers.each {p -> { + p.link.close() + p.setActive(false) + }} + consumers.each {p -> { + p.link.close() + p.setActive(false) + }} + } String buildAddressFromLink(Link link) { diff --git a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Link.groovy b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Link.groovy index a50cc77..9c99694 100644 --- a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Link.groovy +++ b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Link.groovy @@ -46,8 +46,6 @@ abstract class Link>{ this.linkAddress =address } - - boolean getActive() { return active } diff --git a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Manager.groovy b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Manager.groovy index 4204cef..fdc5bc7 100644 --- a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Manager.groovy +++ b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Manager.groovy @@ -75,8 +75,8 @@ class Manager { * @param consumers */ protected void startPublisher(Context context, Publisher publisher) { - logger.debug("Registering publisher {}", publisher) String address = context.buildAddressFromLink(publisher) + logger.debug("Registering publisher {} {} ", publisher,address) publisher.setLink(address,this.connection.openSender(address)) if (publisher instanceof SchedulePublisher){ diff --git a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Publisher.groovy b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Publisher.groovy index a49bd4f..59fa86b 100644 --- a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Publisher.groovy +++ b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Publisher.groovy @@ -58,6 +58,8 @@ class Publisher extends Link { * This method send the body without filtering * on a specific application. * + * This method should be overriden + * * @param body * @return */ @@ -98,7 +100,7 @@ class Publisher extends Link { */ public void send(Map body, String application, Map properties, boolean raw) { - logger.debug("{} Sending {}-> {} ", this.address, body,properties) + logger.debug("{} Sending {}-> {} ", this.linkAddress, body,properties) if(body == null){ body = [:] as Map } @@ -106,6 +108,7 @@ class Publisher extends Link { def message = this.prepareMessage(body, properties, raw) if(application != null && application != ''){ message.subject(application) + message.property('application',application) } Tracker tracker = this.link.send(message) tracker.awaitSettlement(); diff --git a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/SyncedPublisher.groovy b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/SyncedPublisher.groovy index 1d188db..f32de8f 100644 --- a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/SyncedPublisher.groovy +++ b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/SyncedPublisher.groovy @@ -67,7 +67,7 @@ class SyncedPublisher extends Publisher{ Future res= executor.submit { while (replied.get() == null) { - Thread.sleep(500) + Thread.sleep(50) } } correlationIds.put(correlationId, res)