Adding SyncedPublisher

Fixing

https: //bugs.launchpad.net/nebulous/+bug/2053126
https: //bugs.launchpad.net/nebulous/+bug/2053030
Change-Id: I44cc19a75f897dc7814b8ac3b5e42149f095eb7b
This commit is contained in:
Fotis Paraskevopoulos 2024-01-24 14:48:53 +02:00 committed by Rudi Schlatte
parent f4777a3fb5
commit 42103992a4
7 changed files with 117 additions and 16 deletions

View File

@ -0,0 +1,102 @@
package eu.nebulouscloud.exn;
import eu.nebulouscloud.exn.core.*;
import eu.nebulouscloud.exn.handlers.ConnectorHandler;
import eu.nebulouscloud.exn.settings.StaticExnConfig;
import org.apache.qpid.protonj2.client.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
class MyStopOnDemmandHandler extends ConnectorHandler {
Logger logger = LoggerFactory.getLogger(MySyncedHandler.class);
@Override
public void onReady(Context context) {
if (context.hasPublisher("synced")) {
logger.debug("Sending synced");
Map ret = ((SyncedPublisher) context.getPublisher("synced")).sendSync(
Map.of("message", "hello world"),
null, null, false
);
logger.debug("Received synced " + ret);
}
}
}
class TestImmediateStop {
static Logger logger = LoggerFactory.getLogger(TestSyncedPublisher.class);
public static void main(String[] args) {
try {
Connector c = new Connector(
"ui",
new MySyncedHandler(),
List.of(),
List.of(
new Consumer("synced_consumer", "synced", new Handler() {
@Override
public void onMessage(String key, String address, Map body, Message message, Context context) {
logger.debug("Replying in 5 seconds to " + context.getPublisher("synced_reply").address());
Executors.newSingleThreadScheduledExecutor().schedule(
new Runnable() {
@Override
public void run() {
try {
Map b = Map.of("correlation-id", message.correlationId());
context.getPublisher("synced_reply").send(
Map.of("all", "good"),
null,
b
);
} catch (Exception e) {
logger.error("Error replying", e);
}
}
}, 5, TimeUnit.SECONDS
);
}
}, true)
),
false,
false,
new StaticExnConfig(
"localhost",
5672,
"admin",
"admin"
)
);
c.start();
Executors.newSingleThreadScheduledExecutor().schedule(new Runnable() {
@Override
public void run() {
System.out.println("Force Stop" );
c.stop();
}
}, 2, TimeUnit.SECONDS);
} catch (Exception e) {
e.printStackTrace();
}
}
}

View File

@ -112,7 +112,6 @@ public class Connector {
public void stop() {
this.context.stop()
def executor = Executors.newSingleThreadScheduledExecutor()
executor
.schedule(new Runnable() {

View File

@ -139,18 +139,17 @@ class Context {
void stop(){
publishers.each {p -> {
p.setActive(false)
p.link.close()
}}
consumers.each {p -> {
p.setActive(false)
p.link.close()
}}
manager.stop()
publishers.each {p -> {
p.link.close()
p.setActive(false)
}}
consumers.each {p -> {
p.link.close()
p.setActive(false)
}}
}
String buildAddressFromLink(Link link) {

View File

@ -46,8 +46,6 @@ abstract class Link<T extends org.apache.qpid.protonj2.client.Link<T>>{
this.linkAddress =address
}
boolean getActive() {
return active
}

View File

@ -75,8 +75,8 @@ class Manager {
* @param consumers
*/
protected void startPublisher(Context context, Publisher publisher) {
logger.debug("Registering publisher {}", publisher)
String address = context.buildAddressFromLink(publisher)
logger.debug("Registering publisher {} {} ", publisher,address)
publisher.setLink(address,this.connection.openSender(address))
if (publisher instanceof SchedulePublisher){

View File

@ -58,6 +58,8 @@ class Publisher extends Link<Sender> {
* This method send the body without filtering
* on a specific application.
*
* This method should be overriden
*
* @param body
* @return
*/
@ -98,7 +100,7 @@ class Publisher extends Link<Sender> {
*/
public void send(Map body, String application, Map<String,String> properties, boolean raw) {
logger.debug("{} Sending {}-> {} ", this.address, body,properties)
logger.debug("{} Sending {}-> {} ", this.linkAddress, body,properties)
if(body == null){
body = [:] as Map
}
@ -106,6 +108,7 @@ class Publisher extends Link<Sender> {
def message = this.prepareMessage(body, properties, raw)
if(application != null && application != ''){
message.subject(application)
message.property('application',application)
}
Tracker tracker = this.link.send(message)
tracker.awaitSettlement();

View File

@ -67,7 +67,7 @@ class SyncedPublisher extends Publisher{
Future<Map> res= executor.submit {
while (replied.get() == null) {
Thread.sleep(500)
Thread.sleep(50)
}
}
correlationIds.put(correlationId, res)