Correcting an invalid look variable which was

not thread safe.

Fixing handling of lists in constructors

Change-Id: I491eb7bc8082a6c877b974af72947e45e24eae46
This commit is contained in:
Fotis Paraskevopoulos 2023-11-29 12:10:18 +01:00
parent 569aa6a9a1
commit ebf51c3235
6 changed files with 22 additions and 19 deletions

View File

@ -33,7 +33,7 @@ class MyConnectorHandler extends ConnectorHandler {
@Override
def onReady(AtomicReference<Context> context) {
def void onReady(AtomicReference<Context> context) {
println ("Ready")

View File

@ -6,22 +6,21 @@ import eu.nebulouscloud.exn.handlers.ConnectorHandler
import eu.nebulouscloud.exn.settings.StaticExnConfig
import org.apache.qpid.protonj2.client.Message
import org.apache.qpid.protonj2.client.exceptions.ClientException
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.util.concurrent.atomic.AtomicReference
class MyConnectorHandler extends ConnectorHandler {
@Override
def onReady(AtomicReference<Context> context) {
def void onReady(AtomicReference<Context> context) {
println ("Ready start working")
}
}
class MyCustomConsumerHandler extends Handler{
@Override
def onMessage(String key, String address, Map body, Message message, AtomicReference<Context> context) {
def void onMessage(String key, String address, Map body, Message message, AtomicReference<Context> context) {
println "Received by custom handler ${key} => ${address} = ${body}"
}
}
@ -38,7 +37,7 @@ public static void main(String[] args) {
new Consumer("ui_health","health", new MyCustomConsumerHandler(), true),
new Consumer("ui_all","eu.nebulouscloud.ui.preferences.>", new Handler(){
@Override
def Object onMessage(String key, String address, Map body, Message rawMessage, AtomicReference<Context> context) {
def void onMessage(String key, String address, Map body, Message rawMessage, AtomicReference<Context> context) {
if(key == "ui_all"){
println "These are my preferences => ${body}"
}
@ -46,7 +45,7 @@ public static void main(String[] args) {
},true,true),
],
new StaticExnConfig(
'localhosts',
'localhost',
5672,
"admin",
"admin"

View File

@ -57,15 +57,15 @@ public class Connector {
)
)
List<Publisher> compiledPublishers = new ArrayList<>()
if (enableState) {
publishers.add(
compiledPublishers.add(
new StatePublisher()
)
}
if (enableHealth) {
publishers.add(
compiledPublishers.add(
new SchedulePublisher(
this.config.healthTimeout(),
'health',
@ -75,7 +75,8 @@ public class Connector {
)
)
}
this.publishers = publishers
compiledPublishers.addAll(publishers)
this.publishers = compiledPublishers
this.executorService = Executors.newCachedThreadPool();
}
@ -130,12 +131,15 @@ public class Connector {
connectionOpts.reconnectEnabled(true);
this.connection = client.connect(config.url(), config.port(), connectionOpts)
for (Publisher p : publishers) {
String address = this.context.get().buildAddressFromLink(p)
p.setLink(address,connection.openSender(address))
logger.debug("Registering publisher {}", p)
this.context.get().registerPublisher(p)
if (p instanceof SchedulePublisher){
logger.debug("Adding scheduled publisher as scheduled publisher {}", p)
final Publisher threadPublisher = p;
this.executorService.submit(
new Runnable() {
@Override
@ -143,12 +147,12 @@ public class Connector {
boolean healthy = true
while(healthy && running.get()){
try{
logger.debug("Processing scheduled executor [{}] {} ",p.key, address)
p.send()
logger.debug("\t waiting for {} = {} ",address,p.delay)
Thread.sleep(p.delay*1000)
logger.debug("Processing scheduled executor [{}] {} ", threadPublisher.key, address)
threadPublisher.send()
logger.debug("\t waiting for {} = {} ",address, threadPublisher.delay)
Thread.sleep(threadPublisher.delay*1000)
}catch (Exception e){
logger.error("Error processing scheduled executor [{}] - disabling", p.key,e)
logger.error("Error processing scheduled executor [{}] - disabling", threadPublisher.key,e)
healthy=false
}
}

View File

@ -30,7 +30,7 @@ class Context {
}
void registerConsumers(consumer) {
consumers[publisher.key()] = consumer
consumers[consumer.key()] = consumer
}
String buildAddressFromLink(Link link) {

View File

@ -9,7 +9,7 @@ import java.util.concurrent.atomic.AtomicReference
abstract class Handler {
private static final Logger logger = LoggerFactory.getLogger(Consumer.class)
public onMessage(String key, String address, Map body, Message message, AtomicReference<Context> context){
public void onMessage(String key, String address, Map body, Message message, AtomicReference<Context> context){
logger.debug("Default on message for delivery for {} => {} ({}) = {}",
key,
address,

View File

@ -17,7 +17,7 @@ abstract class ConnectorHandler {
}
public onReady(AtomicReference<Context> context){
public void onReady(AtomicReference<Context> context){
}