From f4777a3fb56fee8180b4bc7584b166405f7b547c Mon Sep 17 00:00:00 2001 From: Fotis Paraskevopoulos Date: Thu, 8 Feb 2024 11:41:28 +0200 Subject: [PATCH] Adding SyncedPublisher Change-Id: Ib18242347f43fbd19f39384f914b2d95d97e398d --- .../nebulouscloud/exn/TestSyncPublisher.java | 99 +++++++++++++++++++ .../eu/nebulouscloud/exn/core/Manager.groovy | 18 ++++ .../nebulouscloud/exn/core/Publisher.groovy | 5 + .../exn/core/SyncedPublisher.groovy | 88 +++++++++++++++++ 4 files changed, 210 insertions(+) create mode 100644 exn-connector/src/main/examples/eu/nebulouscloud/exn/TestSyncPublisher.java create mode 100644 exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/SyncedPublisher.groovy diff --git a/exn-connector/src/main/examples/eu/nebulouscloud/exn/TestSyncPublisher.java b/exn-connector/src/main/examples/eu/nebulouscloud/exn/TestSyncPublisher.java new file mode 100644 index 0000000..10d321f --- /dev/null +++ b/exn-connector/src/main/examples/eu/nebulouscloud/exn/TestSyncPublisher.java @@ -0,0 +1,99 @@ +package eu.nebulouscloud.exn; + + +import eu.nebulouscloud.exn.core.*; +import eu.nebulouscloud.exn.handlers.ConnectorHandler; +import eu.nebulouscloud.exn.settings.StaticExnConfig; +import org.apache.qpid.protonj2.client.Message; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +class MySyncedHandler extends ConnectorHandler { + + Logger logger = LoggerFactory.getLogger(MySyncedHandler.class); + + @Override + public void onReady(Context context) { + + + if (context.hasPublisher("synced")) { + + logger.debug("Sending synced"); + Map ret = ((SyncedPublisher) context.getPublisher("synced")).sendSync( + Map.of("message", "hello world"), + null, null, false + ); + logger.debug("Received synced " + ret); + + } + } +} + + +class TestSyncedPublisher { + + static Logger logger = LoggerFactory.getLogger(TestSyncedPublisher.class); + + public static void main(String[] args) { + try { + Connector c = new Connector( + "ui", + new MySyncedHandler(), + List.of( + new SyncedPublisher("synced", "synced", true), + new Publisher("synced_reply", "synced.reply", true) + ), + List.of( + new Consumer("synced_consumer", "synced", new Handler() { + @Override + public void onMessage(String key, String address, Map body, Message message, Context context) { + + logger.debug("Replying in 5 seconds to " + context.getPublisher("synced_reply").address()); + + Executors.newSingleThreadScheduledExecutor().schedule( + new Runnable() { + @Override + public void run() { + try { + Map b = Map.of("correlation-id", message.correlationId()); + context.getPublisher("synced_reply").send( + Map.of("all", "good"), + null, + b + ); + } catch (Exception e) { + logger.error("Error replying", e); + } + + } + }, 5, TimeUnit.SECONDS + ); + + + } + }, true) + ), + false, + false, + new StaticExnConfig( + "localhost", + 5672, + "admin", + "admin" + ) + ); + c.start(); + + } catch (Exception e) { + e.printStackTrace(); + } + } +} + + diff --git a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Manager.groovy b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Manager.groovy index 04ea6ce..4204cef 100644 --- a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Manager.groovy +++ b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Manager.groovy @@ -2,6 +2,7 @@ package eu.nebulouscloud.exn.core import org.apache.qpid.protonj2.client.Connection import org.apache.qpid.protonj2.client.Delivery +import org.apache.qpid.protonj2.client.Message import org.apache.qpid.protonj2.client.Receiver import org.apache.qpid.protonj2.client.ReceiverOptions import org.apache.qpid.protonj2.client.Session @@ -101,6 +102,23 @@ class Manager { ) } + if(publisher instanceof SyncedPublisher){ + final p = publisher + startConsumer(context, new Consumer( + publisher.key+"-reply", + publisher.replyAddress, + new Handler() { + @Override + void onMessage(String key, String onAddress, Map body, Message message, Context onAcontext) { + if(p.hasCorrelationId(message.correlationId())){ + p.replied.set(body) + } + } + }, + publisher.replyTopic, + publisher.replyFQDN + )) + } } diff --git a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Publisher.groovy b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Publisher.groovy index d4da0a8..a49bd4f 100644 --- a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Publisher.groovy +++ b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Publisher.groovy @@ -10,6 +10,11 @@ import javax.swing.MenuSelectionManager import java.time.ZoneOffset import java.time.ZonedDateTime import java.time.format.DateTimeFormatter +import java.util.concurrent.CountDownLatch +import java.util.concurrent.Executor +import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit /** * This is the core publisher class which abstract the logic to diff --git a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/SyncedPublisher.groovy b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/SyncedPublisher.groovy new file mode 100644 index 0000000..1d188db --- /dev/null +++ b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/SyncedPublisher.groovy @@ -0,0 +1,88 @@ +package eu.nebulouscloud.exn.core + + +import org.apache.qpid.protonj2.client.Message +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors +import java.util.concurrent.Future +import java.util.concurrent.atomic.AtomicReference + +/** + * This is the core publisher class which abstract the logic to + * publish events. + * + * + * Using this class you define the AMQP address for which you wish + * to publish messages. + * + * The class takes care of preparing the the {@link Message} including + * content-type, message payload, and serialization + * + */ + + +class SyncedPublisher extends Publisher{ + + private static final Logger logger = LoggerFactory.getLogger(SyncedPublisher.class) + + public String replyAddress + public boolean replyTopic + public boolean replyFQDN + protected AtomicReference replied + private ConcurrentHashMap> correlationIds + private ExecutorService executor + + SyncedPublisher(String key, String address, boolean Topic, boolean FQDN=false, + String replyAddress="reply") { + super(key, address, Topic, FQDN) + this.replyAddress = this.address+"."+replyAddress + this.replyTopic = Topic + this.replyFQDN = FQDN + this.replied = new AtomicReference<>() + this.correlationIds = new ConcurrentHashMap<>() + this.executor= Executors.newSingleThreadExecutor() + } + + + public Map sendSync(Map body, String application, Map properties, boolean raw) { + String correlationId = UUID.randomUUID().toString().replace("-", "") + + Map sendPropeties = new HashMap<>() + + if(properties != null){ + sendPropeties.putAll(properties) + if(properties.containsKey("correlation-id")){ + correlationId = properties.get("correlation-id") + + } + }else{ + sendPropeties.put("correlation-id",correlationId) + } + + this.replied.set(null) + + Future res= executor.submit { + while (replied.get() == null) { + Thread.sleep(500) + } + } + correlationIds.put(correlationId, res) + + + send(body,application,sendPropeties,raw) + res.get() + Map ret = this.replied.get() + this.replied.set(null) + return ret + } + + public boolean hasCorrelationId(String correlationId){ + return this.correlationIds.containsKey(correlationId) + } + + +}