Adding tests

Adding javadocs
Adding test

Bugs:

Bug 2047143
Bug 2047131
Bug 2047058

Correcting an invalid look variable which was
not thread safe.

Fixing handling of lists in constructors

Change-Id: I13a6d263a485f0fef551356b7f40475c97601ae6
This commit is contained in:
Fotis Paraskevopoulos 2023-11-29 12:10:18 +01:00
parent ebf51c3235
commit 1c908a05ce
19 changed files with 939 additions and 353 deletions

@ -1,10 +1,3 @@
/*
* This file was generated by the Gradle 'init' task.
*
* This generated file contains a sample Groovy library project to get you started.
* For more details take a look at the 'Building Java & JVM projects' chapter in the Gradle
* User Manual available at https://docs.gradle.org/7.5.1/userguide/building_java_projects.html
*/
plugins {
// Apply the groovy Plugin to add support for Groovy.
@ -13,7 +6,6 @@ plugins {
// Apply the java-library plugin for API and implementation separation.
id 'java-library'
}
repositories {
// Use Maven Central for resolving dependencies.
mavenCentral()
@ -33,8 +25,6 @@ dependencies {
testImplementation 'org.spockframework:spock-core:2.1-groovy-3.0'
testImplementation 'junit:junit:4.13.2'
// This dependency is exported to consumers, that is to say found on their compile classpath.
api 'org.apache.commons:commons-math3:3.6.1'
}
tasks.named('test') {
@ -43,7 +33,16 @@ tasks.named('test') {
}
java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(11))
// Set the Java version for source and target compatibility
sourceCompatibility = JavaVersion.VERSION_11
targetCompatibility = JavaVersion.VERSION_11
}
sourceSets {
main{
groovy{
srcDirs = ['src/main/groovy','src/main/resources','src/main/examples']
}
}
}

@ -8,4 +8,3 @@
*/
rootProject.name = 'eu.nebulouscloud.exn'
include('examples')

@ -1,114 +0,0 @@
import eu.nebulouscloud.exn.Connector
import eu.nebulouscloud.exn.core.Consumer
import eu.nebulouscloud.exn.core.Context
import eu.nebulouscloud.exn.core.Publisher
import eu.nebulouscloud.exn.core.StatePublisher
import eu.nebulouscloud.exn.handlers.ConnectorHandler
import eu.nebulouscloud.exn.settings.ExnConfig
import eu.nebulouscloud.exn.settings.StaticExnConfig
import org.apache.qpid.protonj2.client.Message
import org.apache.qpid.protonj2.client.exceptions.ClientException
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.util.concurrent.atomic.AtomicReference
class MyPublisher extends Publisher{
MyPublisher() {
super('preferences', 'preferences.changed', true)
}
public send(){
super.send([
"preferences": [
"dark_mode": true
]
])
}
}
class MyConnectorHandler extends ConnectorHandler {
@Override
def void onReady(AtomicReference<Context> context) {
println ("Ready")
/*
Here we are checking to see inf the default `state` publisher is
available. Even though this should be already be known by the
developer, a check never did any harm.
The state publisher is a core publisher with the required
methods to pubilsh component state.
Calling these methods and bootstraing them into the application
logic falls on the developer.
*/
if(context.get().hasPublisher('state')){
StatePublisher sp = context.get().getPublisher("state") as StatePublisher
sp.starting()
sp.started()
sp.custom("forecasting")
sp.stopping()
sp.stopped()
}
/**
* This is an example of a default Publisher just sending an arbitrary message
*
*/
(context.get().getPublisher("config") as Publisher).send([
"hello": "world"
] as Map)
/**
* This is an example of an extended publisher where the body of the message
* is managed internally by the class
*/
(context.get().getPublisher("preferences") as MyPublisher).send()
}
}
public static void main(String[] args) {
try {
Connector c = new Connector(
"ui",
new MyConnectorHandler(),
[
new Publisher("config","config",true),
new MyPublisher()
],
[],
true,
true,
new StaticExnConfig(
'localhost',
5672,
"admin",
"admin"
)
)
c.start()
} catch (ClientException e) {
e.printStackTrace();
}
}

@ -0,0 +1,122 @@
package eu.nebulouscloud.exn;
import eu.nebulouscloud.exn.core.Context;
import eu.nebulouscloud.exn.core.Publisher;
import eu.nebulouscloud.exn.core.StatePublisher;
import eu.nebulouscloud.exn.handlers.ConnectorHandler;
import eu.nebulouscloud.exn.settings.StaticExnConfig;
import org.apache.qpid.protonj2.client.exceptions.ClientException;
import java.util.List;
import java.util.Map;
class MyPublisher extends Publisher {
public MyPublisher() {
super("preferences", "preferences.changed", true);
}
public void send(){
super.send(Map.of(
"dark_mode",true
),"a");
}
}
class MyPublisherTestConnectorHandler extends ConnectorHandler {
@Override
public void onReady(Context context) {
System.out.println("Ready");
/*
Here we are checking to see inf the default `state` publisher is
available. Even though this should be already be known by the
developer, a check never did any harm.
The state publisher is a core publisher with the required
methods to pubilsh component state.
Calling these methods and bootstraing them into the application
logic falls on the developer.
*/
if(context.hasPublisher("state")){
StatePublisher sp = (StatePublisher) context.getPublisher("state");
sp.starting();
sp.started();
sp.custom("forecasting");
sp.stopping();
sp.stopped();
}
/**
* This is an example of a default Publisher just sending an arbitrary message
*
*/
if(context.hasPublisher("config")) {
(context.getPublisher("config")).send(
Map.of("hello","world"),
"one"
);
(context.getPublisher("config")).send(
Map.of("hello","world"),
"two"
);
}
/**
* This is an example of an extended publisher where the body of the message
* is managed internally by the class
*/
(context.getPublisher("preferences")).send();
}
}
class TestPublisher{
public static void main(String[] args) {
try {
Connector c = new Connector(
"ui",
new MyPublisherTestConnectorHandler(),
List.of(
new Publisher("config","config",true),
new MyPublisher()
),
List.of(),
true,
true,
new StaticExnConfig(
"localhost",
5672,
"admin",
"admin"
)
);
c.start();
} catch (Exception e) {
e.printStackTrace();
}
}
}

@ -1,59 +0,0 @@
import eu.nebulouscloud.exn.Connector
import eu.nebulouscloud.exn.core.Consumer
import eu.nebulouscloud.exn.core.Context
import eu.nebulouscloud.exn.core.Handler
import eu.nebulouscloud.exn.handlers.ConnectorHandler
import eu.nebulouscloud.exn.settings.StaticExnConfig
import org.apache.qpid.protonj2.client.Message
import org.apache.qpid.protonj2.client.exceptions.ClientException
import java.util.concurrent.atomic.AtomicReference
class MyConnectorHandler extends ConnectorHandler {
@Override
def void onReady(AtomicReference<Context> context) {
println ("Ready start working")
}
}
class MyCustomConsumerHandler extends Handler{
@Override
def void onMessage(String key, String address, Map body, Message message, AtomicReference<Context> context) {
println "Received by custom handler ${key} => ${address} = ${body}"
}
}
public static void main(String[] args) {
try {
Connector c = new Connector(
"ui",
new MyConnectorHandler(),
[],
[
new Consumer("ui_health","health", new MyCustomConsumerHandler(), true),
new Consumer("ui_all","eu.nebulouscloud.ui.preferences.>", new Handler(){
@Override
def void onMessage(String key, String address, Map body, Message rawMessage, AtomicReference<Context> context) {
if(key == "ui_all"){
println "These are my preferences => ${body}"
}
}
},true,true),
],
new StaticExnConfig(
'localhost',
5672,
"admin",
"admin"
)
)
c.start()
} catch (ClientException e) {
e.printStackTrace();
}
}

@ -0,0 +1,107 @@
package eu.nebulouscloud.exn;
import eu.nebulouscloud.exn.core.Consumer;
import eu.nebulouscloud.exn.core.Context;
import eu.nebulouscloud.exn.core.Handler;
import eu.nebulouscloud.exn.handlers.ConnectorHandler;
import eu.nebulouscloud.exn.settings.StaticExnConfig;
import org.apache.qpid.protonj2.client.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
import java.util.Objects;
class MyCustomConsumerHandler extends Handler{
Logger logger = LoggerFactory.getLogger(MyCustomConsumerHandler.class);
@Override
public void onMessage(String key, String address, Map body, Message message, Context context) {
logger.info("Received by custom handler {} => {} = {}", key,address,String.valueOf(body));
}
}
class MyConnectorHandler extends ConnectorHandler {
Logger logger = LoggerFactory.getLogger(MyConnectorHandler.class);
@Override
public void onReady(Context context) {
logger.info ("Ready start working");
context.registerConsumer(new Consumer("ui_health","health", new MyCustomConsumerHandler(), true));
/**
* We can then de-register the consumer
*/
new Thread(){
@Override
public void run() {
try {
logger.debug("Waiting for 50 s to unregister consumer");
Thread.sleep(30000);
context.unregisterConsumer("ui_health");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}.start();
}
}
class TestReceiver{
public static void main(String[] args) {
try {
Connector c = new Connector(
"ui",
new MyConnectorHandler(),
List.of(),
List.of(
new Consumer("ui_all","eu.nebulouscloud.ui.preferences.>", new Handler(){
@Override
public void onMessage(String key, String address, Map body, Message rawMessage, Context context) {
if(Objects.equals(key, "ui_all")){
System.out.println("These are my preferences => "+ String.valueOf(body));
}
}
},true,true),
new Consumer("config_one","config", new Handler(){
@Override
public void onMessage(String key, String address, Map body, Message rawMessage, Context context) {
System.out.println("These are my ONE config => "+ String.valueOf(body));
}
},"one", true),
new Consumer("config_two","config", new Handler(){
@Override
public void onMessage(String key, String address, Map body, Message rawMessage, Context context) {
System.out.println("These are my TWO config => "+ String.valueOf(body));
}
},"two", true)
),
false,
false,
new StaticExnConfig(
"localhost",
5672,
"admin",
"admin"
)
);
c.start();
} catch (Exception e) {
e.printStackTrace();
}
}
}

@ -4,30 +4,53 @@ import eu.nebulouscloud.exn.core.*
import eu.nebulouscloud.exn.handlers.ConnectorHandler
import eu.nebulouscloud.exn.settings.ExnConfig
import org.aeonbits.owner.ConfigFactory
import org.apache.qpid.protonj2.client.*
import org.apache.qpid.protonj2.client.Client
import org.apache.qpid.protonj2.client.Connection
import org.apache.qpid.protonj2.client.ConnectionOptions
import org.apache.qpid.protonj2.client.exceptions.ClientException
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.TimeUnit
/**
* This is the connector class of the EXNConnector. Through this
* class you connect to the broker, defined the default consumers
* and publishers.
*
* This abstract all the boiler plate required.
*
*/
public class Connector {
private static final Logger logger = LoggerFactory.getLogger(Connector.class)
private static final Logger logger = LoggerFactory.getLogger(Connector.class)
private final String component
private final Publisher[] publishers
private final Consumer[] consumers
private final ExecutorService executorService
private Connection connection
private ExnConfig config
private final AtomicBoolean running
private final AtomicReference<Context> context
private final AtomicReference<ConnectorHandler> handler
private final Context context
private Connection connection
private Manager manager
/**
*
* @param component This is the name of your component. It will be used in the
* AMPQ addresses generated when FQDN is false. If the {@link ExnConfig#baseName()}
* is 'eu.nebulouscloud' and your component name is 'ui', and your {@link Link#address} has
* a value of 'config', then the AMQP address will be 'eu.nebulouscloud.ui'
* @param handler This is the {@link ConnectorHandler} which will be called once the initialization process
* is complete
* @param publishers A list of publisher which will be ready upon initialization and added to the
* {@link Context} automatically
* @param consumers A list of consumers which will be ready upon initialization and added to the
* {@link Context} automatically
* @param enableState This will enable the default {@link StatePublisher}
* @param enableHealth This will enable the default {@link SchedulePublisher} which will ping
* the components health at {@link ExnConfig#healthTimeout()}
*
* @param configuration An optional {@link eu.nebulouscloud.exn.settings.StaticExnConfig} instance
*/
public Connector(
String component,
ConnectorHandler handler,
@ -40,22 +63,16 @@ public class Connector {
assert component
this.component = component
this.consumers = consumers
this.running = new AtomicBoolean(true);
this.handler = new AtomicReference<>(handler)
this.config = ConfigFactory.create(ExnConfig.class)
if (configuration == null ){
configuration = ConfigFactory.create(ExnConfig.class)
}
this.config = configuration
this.context = new AtomicReference<>(
new Context(
"${configuration .url()}:${configuration .port()}",
"${configuration .baseName()}.${this.component}"
this.context = new Context(
"${configuration.url()}:${configuration.port()}",
"${configuration.baseName()}.${this.component}",
handler
)
)
List<Publisher> compiledPublishers = new ArrayList<>()
if (enableState) {
@ -76,111 +93,61 @@ public class Connector {
)
}
compiledPublishers.addAll(publishers)
this.publishers = compiledPublishers
this.executorService = Executors.newCachedThreadPool();
}
private void startQueueListener(Consumer consumer) {
executorService.submit(new Runnable() {
@Override
void run() {
String address = context.get().buildAddressFromLink(consumer)
try {
Session session = connection.openSession().openFuture().get();
Receiver receiver = session.openReceiver(address).openFuture().get();
consumer.setLink(address,receiver)
while (running.get()) {
Delivery delivery = receiver.receive();
if (delivery != null) {
consumer.onDelivery(delivery, context)
}
}
receiver.close();
session.close();
} catch (ClientException e) {
logger.error("Client exception for {} ",address,e)
} catch (Exception e){
logger.error("General exception for {} ",address,e)
}
}
});
}
for( Consumer c : consumers){
public void stop() {
try {
running.set(false)
connection.close()
executorService.shutdown()
logger.info("Connector stopped gracefully.")
} catch (ClientException e) {
logger.error("Error stopping connector ", e)
this.context.registerConsumer(c)
}
for( Publisher p : compiledPublishers){
this.context.registerPublisher(p)
}
}
public start() {
/**
* Stop everything
*/
public void stop() {
this.context.stop()
def executor = Executors.newSingleThreadScheduledExecutor()
executor
.schedule(new Runnable() {
@Override
void run() {
try {
connection.close()
logger.info("Connector stopped gracefully.")
} catch (ClientException e) {
}
executor.shutdown()
}
},10, TimeUnit.SECONDS)
}
/**
* Start everythin
*/
public void start() {
logger.info("Starting connector...")
try {
final Client client = Client.create();
final ConnectionOptions connectionOpts = new ConnectionOptions();
connectionOpts.user(config.username());
connectionOpts.password(config.password());
connectionOpts.reconnectEnabled(true);
this.connection = client.connect(config.url(), config.port(), connectionOpts)
for (Publisher p : publishers) {
String address = this.context.get().buildAddressFromLink(p)
p.setLink(address,connection.openSender(address))
logger.debug("Registering publisher {}", p)
this.context.get().registerPublisher(p)
if (p instanceof SchedulePublisher){
logger.debug("Adding scheduled publisher as scheduled publisher {}", p)
final Publisher threadPublisher = p;
this.executorService.submit(
new Runnable() {
@Override
void run() {
boolean healthy = true
while(healthy && running.get()){
try{
logger.debug("Processing scheduled executor [{}] {} ", threadPublisher.key, address)
threadPublisher.send()
logger.debug("\t waiting for {} = {} ",address, threadPublisher.delay)
Thread.sleep(threadPublisher.delay*1000)
}catch (Exception e){
logger.error("Error processing scheduled executor [{}] - disabling", threadPublisher.key,e)
healthy=false
}
}
}
}
)
}
}
for (Consumer c : consumers) {
logger.debug("Registering consumer {}", c)
this.context.get().registerConsumers(c)
this.startQueueListener(c)
}
this.executorService.submit(new Runnable() {
@Override
void run() {
while (running.get()){
Thread.sleep(1000)
}
}
})
this.handler.get().setReady(this.context)
this.manager = new Manager(this.connection)
this.context.setManager(manager)
} catch (Exception e) {
logger.error("Error starting connector", e)

@ -1,25 +1,75 @@
package eu.nebulouscloud.exn.core
import eu.nebulouscloud.exn.handlers.ConnectorHandler
import org.apache.qpid.protonj2.client.Delivery
import org.apache.qpid.protonj2.client.Message
import org.apache.qpid.protonj2.client.Receiver
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.util.concurrent.atomic.AtomicReference
/**
* This is the core consumer class which abstract the logic to
* receive the event.
*
*
* Using this class you define the AMQP address for which you wish
* to receive messages.
*
* Once a message is received this can then be handled by a {@link Handler}
* instance
*
*/
class Consumer extends Link<Receiver>{
private static final Logger logger = LoggerFactory.getLogger(Consumer.class)
private Handler handler
private String application
Consumer(String key, String address, Handler handler, boolean topic=true, boolean FQDN=false) {
/**
*
* @param key This is unique identifier of the Consumer.
* @param address This is the AMQP address which will be appended to the
* {@link eu.nebulouscloud.exn.settings.ExnConfig#baseName()} for example
* if the base name is "foo", and the component is "bar" and the address is "hello"
* the AMQP address will be compiled as "foo.bar.hello"
*
* @param handler This is {@link Handler} class which you will use the process the message
* @param application This is an optional key to filter messages for a specific application
* @param topic A boolean parameter defining wether the address relates to a topic of a queue
* if it is a topic then "topic://" will be pre-appended to the address so the
* result will be "topic://foo.bar.hello"
* @param FQDN - If you wish to ignore the {@link eu.nebulouscloud.exn.settings.ExnConfig#baseName()}
* and subscribe to an arbitrary address, then set this to true, and you are
* responsible for writing the fully qualified address for the {@link #address}
* parameter
*/
Consumer(String key, String address, Handler handler, String application, boolean topic=true, boolean FQDN=false) {
super(key, address, topic, FQDN)
this.handler = handler
this.application = application
}
public onDelivery(Delivery delivery, AtomicReference<Context> context){
Consumer(String key, String address, Handler handler, boolean topic=true, boolean FQDN=false) {
this(key, address, handler, null, topic,FQDN)
}
public boolean hasApplication(){
return this.application != null
}
public String getAplication(){
return this.application
}
protected Map processMessage(Message message, Context context){
logger.debug("Processing message for{}",this.linkAddress)
return (Map)message.body()
}
protected void onDelivery(Delivery delivery, Context context){
logger.debug("Default on delivery for delivery for {}",this.linkAddress)
Message message = delivery.message();
@ -31,12 +81,7 @@ class Consumer extends Link<Receiver>{
message,
context
)
delivery.accept();
}
public Map processMessage(Message message, AtomicReference<Context> context){
logger.debug("Processing message for{}",this.linkAddress)
return (Map)message.body()
delivery.accept()
}
}

@ -1,20 +1,92 @@
package eu.nebulouscloud.exn.core
import eu.nebulouscloud.exn.handlers.ConnectorHandler
import groovy.util.logging.Log
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.util.concurrent.atomic.AtomicReference
/**
* This class maintains the application states and is provided
* inside the event loop.
*
* It includes a set of utilities which are helpful during
* setup of your asynchronous application and after its
* initialization.
*
* Through this class you can register consumers, as
* well as publishers, once the event loop has been
* initiated
*
*/
class Context {
Logger logger = LoggerFactory.getLogger(Context.class)
private final String uri
private final String base
private final Map<String,Publisher> publishers = [:]
private final Map<String,Consumer> consumers = [:]
private final ConnectorHandler handler
private Manager manager
public Context(String uri, String base){
this.base = base
public Context(String uri, String base, ConnectorHandler handler){
this.uri = uri
this.base = base
this.handler = handler
}
def getPublisher(key) {
publishers[key]
Manager getManager() {
return manager
}
/**
*
* This method is called when the context is started,
* so it is a good location to initialize the consumers
* already registered.
* @param manager
*/
public void setManager(Manager manager) {
this.manager = manager
logger.info("Registering {} consumers", this.consumers.size())
this.manager.start()
this.consumers.each({
k,v -> {
final Consumer c =v
this.manager.startConsumer(this, c)
}
})
logger.info("Registering {} publishers", this.publishers.size())
this.publishers.each({
k,v -> {
final Publisher p =v
this.manager.startPublisher(this, p)
}
})
this.handler.onReady(this)
}
def Publisher getPublisher(key) {
publishers[key] as Publisher
}
def Consumer getConsumer(key) {
consumers[key] as Consumer
}
boolean hasPublisher(key) {
@ -25,12 +97,53 @@ class Context {
consumers.containsKey(key)
}
void registerPublisher(publisher) {
void registerPublisher(Publisher publisher) {
publishers[publisher.key()] = publisher
if(this.manager !=null && this.manager.getRunning()){
final Publisher p =publisher
this.manager.startPublisher(this,p)
}
}
void registerConsumers(consumer) {
void registerConsumer(Consumer consumer) {
logger.debug("Registering consumer {}=>{}",consumer.key(),consumer.address())
consumers[consumer.key()] = consumer
if(this.manager !=null && this.manager.getRunning()){
final Consumer c = consumer
this.manager.startConsumer(this,c)
}
}
void unregisterConsumer(String key){
logger.debug("Un-Registering consumer {}",key)
if(consumers.containsKey(key)){
Consumer c = consumers.get(key)
c.active=false
consumers.remove(key)
}
}
void unregisterPublisher(String key){
if(publishers.containsKey(key)){
publishers.remove(key)
}
}
void stop(){
publishers.each {p -> {
p.setActive(false)
p.link.close()
}}
consumers.each {p -> {
p.setActive(false)
p.link.close()
}}
manager.stop()
}
String buildAddressFromLink(Link link) {
@ -41,23 +154,5 @@ class Context {
address
}
boolean matchAddress(Link link, event) {
if (!event || !event.message || !event.message.address) {
return false
}
String address = buildAddressFromLink(link)
address == event.message.address
}
String buildAddress(String[] actions, boolean topic = false) {
if (actions.length <= 0) {
return base
}
String address = "${base}.${actions.join('.')}"
if (topic) {
address = "topic://${address}"
}
address
}
}

@ -6,10 +6,24 @@ import org.slf4j.LoggerFactory
import java.util.concurrent.atomic.AtomicReference
/**
* This is the handler class for the {@link Consumer}. You will need to
* create extension of this class in order to handle incoming messages.
*/
abstract class Handler {
private static final Logger logger = LoggerFactory.getLogger(Consumer.class)
public void onMessage(String key, String address, Map body, Message message, AtomicReference<Context> context){
/**
* This is the default handle method, which needs to be overwritten if we
* need to handle
* @param key
* @param address
* @param body
* @param message
* @param context
*/
public void onMessage(String key, String address, Map body, Message message, Context context){
logger.debug("Default on message for delivery for {} => {} ({}) = {}",
key,
address,

@ -1,5 +1,11 @@
package eu.nebulouscloud.exn.core
/**
* This is a base class which abstract the Proton client Link
* code, and provides a basis for the {@link Publisher} and
* {@link Consumer} classes
* @param <T>
*/
abstract class Link<T extends org.apache.qpid.protonj2.client.Link<T>>{
protected String key
@ -8,6 +14,8 @@ abstract class Link<T extends org.apache.qpid.protonj2.client.Link<T>>{
public boolean topic
public boolean fqdn = false
public org.apache.qpid.protonj2.client.Link<T> link
private boolean active
public Link(
String key,
@ -21,13 +29,16 @@ abstract class Link<T extends org.apache.qpid.protonj2.client.Link<T>>{
this.topic = topic
this.address = address
this.key = key
this.active = true
}
public String key(){
return this.key
}
public String address(){
return this.key
return this.address
}
public setLink(String address, org.apache.qpid.protonj2.client.Link<T> link){
@ -35,4 +46,14 @@ abstract class Link<T extends org.apache.qpid.protonj2.client.Link<T>>{
this.linkAddress =address
}
boolean getActive() {
return active
}
void setActive(boolean active) {
this.active = active
}
}

@ -0,0 +1,150 @@
package eu.nebulouscloud.exn.core
import org.apache.qpid.protonj2.client.Connection
import org.apache.qpid.protonj2.client.Delivery
import org.apache.qpid.protonj2.client.Receiver
import org.apache.qpid.protonj2.client.Session
import org.apache.qpid.protonj2.client.exceptions.ClientException
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicBoolean
/**
* This is the thread and connection handling manager.
*
* This class is instantiated during the bootstrap process
* and it abstract the logic of maintaining separate threads
* per {@link Consumer} and {@link SchedulePublisher}
*
* you do not need to instantiated this class. An instance
* of this class is available in the {@link Context}
*/
class Manager {
private Logger logger = LoggerFactory.getLogger(Manager.class)
private final ExecutorService executorService
private final AtomicBoolean running
private Connection connection
public Manager(Connection connection){
this.connection = connection
this.executorService = Executors.newCachedThreadPool();
this.running = new AtomicBoolean(false);
}
protected boolean getRunning() {
return running.get()
}
public stop(){
this.running.set(false)
executorService.shutdown()
}
public start(){
this.running.set(true)
this.executorService.submit(new Runnable() {
@Override
void run() {
while (running){
Thread.sleep(1000)
}
logger.info("Closing")
}
})
}
/**
* This is managed by the context, whose access is controlled by an atomic
* reference. Should be thread safe
*
* @param context
* @param consumers
*/
protected void startPublisher(Context context, Publisher publisher) {
logger.debug("Registering publisher {}", publisher)
String address = context.buildAddressFromLink(publisher)
publisher.setLink(address,this.connection.openSender(address))
if (publisher instanceof SchedulePublisher){
logger.debug("Adding scheduled publisher as scheduled publisher {}", publisher)
this.executorService.submit(
new Runnable() {
@Override
void run() {
boolean healthy = true
while(healthy && running){
try{
logger.debug("Processing scheduled executor [{}] {} ", publisher.key, address)
publisher.send()
logger.debug("\t waiting for {} = {} ",address, publisher.delay)
Thread.sleep(publisher.delay*1000)
}catch (Exception e){
logger.error("Error processing scheduled executor [{}] - disabling", publisher.key,e)
healthy=false
}
}
}
}
)
}
}
/**
* This is managed by the context, whose access is controlled by an atomic
* reference. Should be thread safe
*
* @param context
* @param consumers
*/
protected void startConsumer(Context context, Consumer consumer) {
logger.debug("Starting consumer {} => {}", consumer.key(),consumer.address())
executorService.submit(new Runnable() {
@Override
void run() {
String address = context.buildAddressFromLink(consumer)
try {
Session session = connection.openSession().openFuture().get();
Receiver receiver = session.openReceiver(address).openFuture().get();
logger.info("Linking consumer {}", address)
if (consumer.hasApplication()){
logger.info("\t for application {}", consumer.getAplication())
}
consumer.setLink(address,receiver)
while (running && consumer.getActive()) {
Delivery delivery = receiver.receive();
logger.debug("received delivery {}", address)
if (delivery != null) {
if(consumer.hasApplication()){
if(consumer.getAplication() == delivery.message().subject()){
consumer.onDelivery(delivery, context)
}
}else{
consumer.onDelivery(delivery, context)
}
}
}
logger.info("Stopping consumer {}", address)
receiver.close();
session.close();
} catch (ClientException e) {
logger.error("Client exception for {} ",address,e)
} catch (Exception e){
logger.error("General exception for {} ",address,e)
}
}
});
}
}

@ -10,23 +10,80 @@ import java.time.ZoneOffset
import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter
/**
* This is the core publisher class which abstract the logic to
* publish events.
*
*
* Using this class you define the AMQP address for which you wish
* to publish messages.
*
* The class takes care of preparing the the {@link Message} including
* content-type, message payload, and serialization
*
*/
class Publisher extends Link<Sender> {
private static final Logger logger = LoggerFactory.getLogger(Publisher.class)
/**
*
* @param key This is unique identifier of the Publisher.
* @param address This is the AMQP address which will be appended to the
* {@link eu.nebulouscloud.exn.settings.ExnConfig#baseName()} for example
* if the base name is "foo", and the component is "bar" and the address is "hello"
* the AMQP address will be compiled as "foo.bar.hello"
*
* @param topic A boolean parameter defining wether the address relates to a topic of a queue
* if it is a topic then "topic://" will be pre-appended to the address so the
* result will be "topic://foo.bar.hello"
* @param FQDN - If you wish to ignore the {@link eu.nebulouscloud.exn.settings.ExnConfig#baseName()}
* and subscribe to an arbitrary address, then set this to true, and you are
* responsible for writing the fully qualified address for the {@link #address}
* parameter
*/
Publisher(String key, String address, boolean Topic, boolean FQDN=false) {
super(key, address, Topic, FQDN)
}
/**
* This method send the body without filtering
* on a specific application.
*
* @param body
* @return
*/
public void send() {
send([:] as Map,'')
}
public send(Map body) {
send(body,'')
}
/**
* Use this method to send a message using this
* publisher, filtering on the specific applications
*
* @param body This is the payload of the message
* @param application This is the application for which to send the message to
* @return
*/
public send(Map body, String application) {
logger.debug("{} Sending {}", this.address, body)
if(body == null){
body = [] as Map
}
def message = this.prepareMessage(body)
if(application != null && application != ''){
message.subject(application)
}
Tracker tracker = this.link.send(message)
tracker.awaitSettlement();
}
private Message<Map<String, Object>> prepareMessage(Map body){
def toSend=[
@ -35,6 +92,7 @@ class Publisher extends Link<Sender> {
toSend.putAll(body)
Message<Map<String, Object>> message = Message.create(toSend);
message.contentType("application/json")
message.to(this.linkAddress)
return message

@ -1,8 +1,31 @@
package eu.nebulouscloud.exn.core
/**
* This is an extension of the {@link Publisher} class
* which allows the user to automatically send message
* at scheduled intervals.
*
*/
class SchedulePublisher extends Publisher{
private final int delay
/**
*
* @param delay The delay in seconds we wish to send the recurring messages
* @param key This is unique identifier of the Publisher.
* @param address This is the AMQP address which will be appended to the
* {@link eu.nebulouscloud.exn.settings.ExnConfig#baseName()} for example
* if the base name is "foo", and the component is "bar" and the address is "hello"
* the AMQP address will be compiled as "foo.bar.hello"
*
* @param topic A boolean parameter defining wether the address relates to a topic of a queue
* if it is a topic then "topic://" will be pre-appended to the address so the
* result will be "topic://foo.bar.hello"
* @param FQDN - If you wish to ignore the {@link eu.nebulouscloud.exn.settings.ExnConfig#baseName()}
* and subscribe to an arbitrary address, then set this to true, and you are
* responsible for writing the fully qualified address for the {@link #address}
* parameter
*/
SchedulePublisher(Integer delay, String key, String address, boolean Topic, boolean FQDN) {
super(key, address, Topic, FQDN)
this.delay = delay

@ -1,5 +1,17 @@
package eu.nebulouscloud.exn.core
/**
* This is an extension of the {@link Publisher} class
* which abstracts the definition to send component states,
* which are required by the NebulOuScomponentns.
*
* An instance of this class is created during the boostrap
* process and available using the `state` key in the
* {@link Context}
*
*/
class StatePublisher extends Publisher{
StatePublisher() {
super("state", "state", true, false)

@ -5,19 +5,22 @@ import org.apache.qpid.protonj2.client.Message
import java.util.concurrent.atomic.AtomicReference
/**
* This is the main entry point once the application has started.
*
* Upon initialization and thread handling
*
*
*/
abstract class ConnectorHandler {
boolean initialized=false
private AtomicReference<Context> context
public setReady(AtomicReference<Context> context){
this.initialized = true
this.context = context
this.onReady(context)
}
public void onReady(AtomicReference<Context> context){
/**
* This method is called once all initilization has
* completed and the {@link Context} has been instatiated.
* @param context
*/
public void onReady(Context context){
}

@ -5,7 +5,19 @@ import org.aeonbits.owner.Config.Key
import org.aeonbits.owner.Config.Sources
import org.aeonbits.owner.Config.DefaultValue
/**
*
* This class provides the configuration requirements
* for connecting to the broker and instantiating the
* connector.
*
* These properties can be read either
*
* A file in the root path of the application `exn.properties`
* A classpath resource `exn.properties`
* Java -D properties
* Environment variables
*/
@Sources([
"file:./exn.properties",
"classpath:exn.properties",
@ -14,23 +26,48 @@ import org.aeonbits.owner.Config.DefaultValue
])
public interface ExnConfig extends Config {
/**
* This is the base name of the addresses which
* will be generated by the consumers and producers
* @return
*/
@Key("exn.basename")
@DefaultValue("eu.nebulouscloud")
String baseName()
/**
* This is the default interval of the Health publisher
* @return
*/
@Key("exn.health.timeout")
@DefaultValue("15")
Integer healthTimeout()
/**
* This is the url of the broker
* @return
*/
@Key("broker.url")
String url()
/**
* This is the port of the broker
* @return
*/
@Key("broker.port")
int port();
/**
* This is the username required to log into
* @return
*/
@Key("broker.username")
String username()
/**
* This is the password required to log into
* @return
*/
@Key("broker.password")
String password()
}

@ -5,12 +5,14 @@ import org.aeonbits.owner.Config.DefaultValue
import org.aeonbits.owner.Config.Key
import org.aeonbits.owner.Config.Sources
@Sources([
"file:./exn.properties",
"classpath:exn.properties",
"system:properties",
"system:env"
])
/**
* This class extends {@link ExnConfig} and allows you
* to explicitly and statically define the configuration
* properties, in order to handle the configuration
* of your component in the way you choose.
*
*
*/
public class StaticExnConfig implements ExnConfig {
private final String baseName
@ -26,7 +28,7 @@ public class StaticExnConfig implements ExnConfig {
String username,
String password,
Integer healthTimeout=15,
String baseName='eu.nebulous'
String baseName='eu.nebulouscloud'
){
this.url = url

@ -0,0 +1,105 @@
package eu.nebulouscloud.exn.core
import eu.nebulouscloud.exn.handlers.ConnectorHandler
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
public class ContextTest {
def Context c
@BeforeEach
public void initContext(){
c = new Context("uri","base",new ConnectorHandler() {
@Override
void onReady(Context context) {
}
})
}
@Test
public void testPublishersRegistration(){
def publisher = new Publisher("test","address",true)
c.registerPublisher(publisher)
Assertions.assertTrue(c.hasPublisher("test"))
Assertions.assertEquals(publisher,c.getPublisher("test"))
}
@Test
public void testConsumersRegistration(){
def consumer = new Consumer("test","address",new Handler(){})
c.registerConsumer(consumer)
Assertions.assertTrue(c.hasConsumer("test"))
Assertions.assertEquals(consumer,c.getConsumer("test"))
}
@Test
public void testBuildAddressFromLink(){
def consumer = new Consumer("test","address",new Handler(){})
Assertions.assertEquals(c.buildAddressFromLink(consumer), "topic://base.address");
consumer = new Consumer("test","address",new Handler(){},false)
Assertions.assertEquals(c.buildAddressFromLink(consumer), "base.address");
consumer = new Consumer("test","address",new Handler(){},false,true)
Assertions.assertEquals(c.buildAddressFromLink(consumer), "address");
consumer = new Consumer("test","address",new Handler(){},true,true)
Assertions.assertEquals(c.buildAddressFromLink(consumer), "topic://address");
def publisher = new Publisher("test","address",true)
Assertions.assertEquals(c.buildAddressFromLink(publisher), "topic://base.address");
publisher = new Publisher("test","address",false)
Assertions.assertEquals(c.buildAddressFromLink(publisher), "base.address");
publisher = new Publisher("test","address",false,true)
Assertions.assertEquals(c.buildAddressFromLink(publisher), "address");
publisher = new Publisher("test","address",true,true)
Assertions.assertEquals(c.buildAddressFromLink(publisher), "topic://address");
}
@Test
public void testMatchAddress(){
def consumer = new Consumer("test","address",new Handler(){},false)
Assertions.assertEquals(c.buildAddressFromLink(consumer), "base.address");
consumer = new Consumer("test","address",new Handler(){},false,true)
Assertions.assertEquals(c.buildAddressFromLink(consumer), "address");
consumer = new Consumer("test","address",new Handler(){},true,true)
Assertions.assertEquals(c.buildAddressFromLink(consumer), "topic://address");
def publisher = new Publisher("test","address",true)
Assertions.assertEquals(c.buildAddressFromLink(publisher), "topic://base.address");
publisher = new Publisher("test","address",false)
Assertions.assertEquals(c.buildAddressFromLink(publisher), "base.address");
publisher = new Publisher("test","address",false,true)
Assertions.assertEquals(c.buildAddressFromLink(publisher), "address");
publisher = new Publisher("test","address",true,true)
Assertions.assertEquals(c.buildAddressFromLink(publisher), "topic://address");
}
}