Adding SyncedPublisher

Change-Id: Ib18242347f43fbd19f39384f914b2d95d97e398d
This commit is contained in:
Fotis Paraskevopoulos 2024-02-08 11:41:28 +02:00
parent aaffb05e01
commit f4777a3fb5
4 changed files with 210 additions and 0 deletions

View File

@ -0,0 +1,99 @@
package eu.nebulouscloud.exn;
import eu.nebulouscloud.exn.core.*;
import eu.nebulouscloud.exn.handlers.ConnectorHandler;
import eu.nebulouscloud.exn.settings.StaticExnConfig;
import org.apache.qpid.protonj2.client.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
class MySyncedHandler extends ConnectorHandler {
Logger logger = LoggerFactory.getLogger(MySyncedHandler.class);
@Override
public void onReady(Context context) {
if (context.hasPublisher("synced")) {
logger.debug("Sending synced");
Map ret = ((SyncedPublisher) context.getPublisher("synced")).sendSync(
Map.of("message", "hello world"),
null, null, false
);
logger.debug("Received synced " + ret);
}
}
}
class TestSyncedPublisher {
static Logger logger = LoggerFactory.getLogger(TestSyncedPublisher.class);
public static void main(String[] args) {
try {
Connector c = new Connector(
"ui",
new MySyncedHandler(),
List.of(
new SyncedPublisher("synced", "synced", true),
new Publisher("synced_reply", "synced.reply", true)
),
List.of(
new Consumer("synced_consumer", "synced", new Handler() {
@Override
public void onMessage(String key, String address, Map body, Message message, Context context) {
logger.debug("Replying in 5 seconds to " + context.getPublisher("synced_reply").address());
Executors.newSingleThreadScheduledExecutor().schedule(
new Runnable() {
@Override
public void run() {
try {
Map b = Map.of("correlation-id", message.correlationId());
context.getPublisher("synced_reply").send(
Map.of("all", "good"),
null,
b
);
} catch (Exception e) {
logger.error("Error replying", e);
}
}
}, 5, TimeUnit.SECONDS
);
}
}, true)
),
false,
false,
new StaticExnConfig(
"localhost",
5672,
"admin",
"admin"
)
);
c.start();
} catch (Exception e) {
e.printStackTrace();
}
}
}

View File

@ -2,6 +2,7 @@ package eu.nebulouscloud.exn.core
import org.apache.qpid.protonj2.client.Connection
import org.apache.qpid.protonj2.client.Delivery
import org.apache.qpid.protonj2.client.Message
import org.apache.qpid.protonj2.client.Receiver
import org.apache.qpid.protonj2.client.ReceiverOptions
import org.apache.qpid.protonj2.client.Session
@ -101,6 +102,23 @@ class Manager {
)
}
if(publisher instanceof SyncedPublisher){
final p = publisher
startConsumer(context, new Consumer(
publisher.key+"-reply",
publisher.replyAddress,
new Handler() {
@Override
void onMessage(String key, String onAddress, Map body, Message message, Context onAcontext) {
if(p.hasCorrelationId(message.correlationId())){
p.replied.set(body)
}
}
},
publisher.replyTopic,
publisher.replyFQDN
))
}
}

View File

@ -10,6 +10,11 @@ import javax.swing.MenuSelectionManager
import java.time.ZoneOffset
import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executor
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
/**
* This is the core publisher class which abstract the logic to

View File

@ -0,0 +1,88 @@
package eu.nebulouscloud.exn.core
import org.apache.qpid.protonj2.client.Message
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.Future
import java.util.concurrent.atomic.AtomicReference
/**
* This is the core publisher class which abstract the logic to
* publish events.
*
*
* Using this class you define the AMQP address for which you wish
* to publish messages.
*
* The class takes care of preparing the the {@link Message} including
* content-type, message payload, and serialization
*
*/
class SyncedPublisher extends Publisher{
private static final Logger logger = LoggerFactory.getLogger(SyncedPublisher.class)
public String replyAddress
public boolean replyTopic
public boolean replyFQDN
protected AtomicReference<Map> replied
private ConcurrentHashMap<String,Future<Map>> correlationIds
private ExecutorService executor
SyncedPublisher(String key, String address, boolean Topic, boolean FQDN=false,
String replyAddress="reply") {
super(key, address, Topic, FQDN)
this.replyAddress = this.address+"."+replyAddress
this.replyTopic = Topic
this.replyFQDN = FQDN
this.replied = new AtomicReference<>()
this.correlationIds = new ConcurrentHashMap<>()
this.executor= Executors.newSingleThreadExecutor()
}
public Map sendSync(Map body, String application, Map<String,String> properties, boolean raw) {
String correlationId = UUID.randomUUID().toString().replace("-", "")
Map<String,String> sendPropeties = new HashMap<>()
if(properties != null){
sendPropeties.putAll(properties)
if(properties.containsKey("correlation-id")){
correlationId = properties.get("correlation-id")
}
}else{
sendPropeties.put("correlation-id",correlationId)
}
this.replied.set(null)
Future<Map> res= executor.submit {
while (replied.get() == null) {
Thread.sleep(500)
}
}
correlationIds.put(correlationId, res)
send(body,application,sendPropeties,raw)
res.get()
Map ret = this.replied.get()
this.replied.set(null)
return ret
}
public boolean hasCorrelationId(String correlationId){
return this.correlationIds.containsKey(correlationId)
}
}