diff --git a/exn-connector/build.gradle b/exn-connector/build.gradle index e355350..7adbfc4 100644 --- a/exn-connector/build.gradle +++ b/exn-connector/build.gradle @@ -1,10 +1,3 @@ -/* - * This file was generated by the Gradle 'init' task. - * - * This generated file contains a sample Groovy library project to get you started. - * For more details take a look at the 'Building Java & JVM projects' chapter in the Gradle - * User Manual available at https://docs.gradle.org/7.5.1/userguide/building_java_projects.html - */ plugins { // Apply the groovy Plugin to add support for Groovy. @@ -13,7 +6,6 @@ plugins { // Apply the java-library plugin for API and implementation separation. id 'java-library' } - repositories { // Use Maven Central for resolving dependencies. mavenCentral() @@ -33,8 +25,6 @@ dependencies { testImplementation 'org.spockframework:spock-core:2.1-groovy-3.0' testImplementation 'junit:junit:4.13.2' - // This dependency is exported to consumers, that is to say found on their compile classpath. - api 'org.apache.commons:commons-math3:3.6.1' } tasks.named('test') { @@ -43,7 +33,16 @@ tasks.named('test') { } java { - toolchain { - languageVersion.set(JavaLanguageVersion.of(11)) + // Set the Java version for source and target compatibility + sourceCompatibility = JavaVersion.VERSION_11 + targetCompatibility = JavaVersion.VERSION_11 +} + + +sourceSets { + main{ + groovy{ + srcDirs = ['src/main/groovy','src/main/resources','src/main/examples'] + } } } \ No newline at end of file diff --git a/exn-connector/settings.gradle b/exn-connector/settings.gradle index e9f3f49..233848c 100644 --- a/exn-connector/settings.gradle +++ b/exn-connector/settings.gradle @@ -8,4 +8,3 @@ */ rootProject.name = 'eu.nebulouscloud.exn' -include('examples') diff --git a/exn-connector/src/main/examples/eu/nebulouscloud/exn/TestPublisher.groovy b/exn-connector/src/main/examples/eu/nebulouscloud/exn/TestPublisher.groovy deleted file mode 100644 index e93e975..0000000 --- a/exn-connector/src/main/examples/eu/nebulouscloud/exn/TestPublisher.groovy +++ /dev/null @@ -1,114 +0,0 @@ -import eu.nebulouscloud.exn.Connector -import eu.nebulouscloud.exn.core.Consumer -import eu.nebulouscloud.exn.core.Context -import eu.nebulouscloud.exn.core.Publisher -import eu.nebulouscloud.exn.core.StatePublisher -import eu.nebulouscloud.exn.handlers.ConnectorHandler -import eu.nebulouscloud.exn.settings.ExnConfig -import eu.nebulouscloud.exn.settings.StaticExnConfig -import org.apache.qpid.protonj2.client.Message -import org.apache.qpid.protonj2.client.exceptions.ClientException -import org.slf4j.Logger -import org.slf4j.LoggerFactory - -import java.util.concurrent.atomic.AtomicReference - -class MyPublisher extends Publisher{ - - MyPublisher() { - super('preferences', 'preferences.changed', true) - } - - public send(){ - super.send([ - "preferences": [ - "dark_mode": true - ] - ]) - } -} - - -class MyConnectorHandler extends ConnectorHandler { - - - @Override - def void onReady(AtomicReference context) { - println ("Ready") - - - /* - Here we are checking to see inf the default `state` publisher is - available. Even though this should be already be known by the - developer, a check never did any harm. - - The state publisher is a core publisher with the required - methods to pubilsh component state. - - Calling these methods and bootstraing them into the application - logic falls on the developer. - - */ - - - if(context.get().hasPublisher('state')){ - - StatePublisher sp = context.get().getPublisher("state") as StatePublisher - - sp.starting() - sp.started() - sp.custom("forecasting") - sp.stopping() - sp.stopped() - - } - - /** - * This is an example of a default Publisher just sending an arbitrary message - * - */ - (context.get().getPublisher("config") as Publisher).send([ - "hello": "world" - ] as Map) - - /** - * This is an example of an extended publisher where the body of the message - * is managed internally by the class - */ - (context.get().getPublisher("preferences") as MyPublisher).send() - - } - -} - - -public static void main(String[] args) { - try { - - - Connector c = new Connector( - - "ui", - new MyConnectorHandler(), - [ - new Publisher("config","config",true), - new MyPublisher() - ], - [], - true, - true, - new StaticExnConfig( - 'localhost', - 5672, - "admin", - "admin" - ) - - ) - - c.start() - } catch (ClientException e) { - e.printStackTrace(); - } -} - diff --git a/exn-connector/src/main/examples/eu/nebulouscloud/exn/TestPublisher.java b/exn-connector/src/main/examples/eu/nebulouscloud/exn/TestPublisher.java new file mode 100644 index 0000000..baffca3 --- /dev/null +++ b/exn-connector/src/main/examples/eu/nebulouscloud/exn/TestPublisher.java @@ -0,0 +1,122 @@ +package eu.nebulouscloud.exn; + + +import eu.nebulouscloud.exn.core.Context; +import eu.nebulouscloud.exn.core.Publisher; +import eu.nebulouscloud.exn.core.StatePublisher; +import eu.nebulouscloud.exn.handlers.ConnectorHandler; +import eu.nebulouscloud.exn.settings.StaticExnConfig; +import org.apache.qpid.protonj2.client.exceptions.ClientException; + +import java.util.List; +import java.util.Map; + +class MyPublisher extends Publisher { + + public MyPublisher() { + super("preferences", "preferences.changed", true); + } + + public void send(){ + super.send(Map.of( + "dark_mode",true + ),"a"); + } +} + + +class MyPublisherTestConnectorHandler extends ConnectorHandler { + + + @Override + public void onReady(Context context) { + System.out.println("Ready"); + + + /* + Here we are checking to see inf the default `state` publisher is + available. Even though this should be already be known by the + developer, a check never did any harm. + + The state publisher is a core publisher with the required + methods to pubilsh component state. + + Calling these methods and bootstraing them into the application + logic falls on the developer. + + */ + + + if(context.hasPublisher("state")){ + + StatePublisher sp = (StatePublisher) context.getPublisher("state"); + + sp.starting(); + sp.started(); + sp.custom("forecasting"); + sp.stopping(); + sp.stopped(); + + } + + /** + * This is an example of a default Publisher just sending an arbitrary message + * + */ + if(context.hasPublisher("config")) { + + (context.getPublisher("config")).send( + Map.of("hello","world"), + "one" + ); + (context.getPublisher("config")).send( + Map.of("hello","world"), + "two" + ); + + } + + /** + * This is an example of an extended publisher where the body of the message + * is managed internally by the class + */ + (context.getPublisher("preferences")).send(); + + } + +} + +class TestPublisher{ + public static void main(String[] args) { + try { + + Connector c = new Connector( + + "ui", + new MyPublisherTestConnectorHandler(), + List.of( + new Publisher("config","config",true), + new MyPublisher() + ), + List.of(), + true, + true, + new StaticExnConfig( + "localhost", + 5672, + "admin", + "admin" + ) + + ); + + c.start(); + + + } catch (Exception e) { + e.printStackTrace(); + } + } + + +} diff --git a/exn-connector/src/main/examples/eu/nebulouscloud/exn/TestReceiver.groovy b/exn-connector/src/main/examples/eu/nebulouscloud/exn/TestReceiver.groovy deleted file mode 100644 index 1df9190..0000000 --- a/exn-connector/src/main/examples/eu/nebulouscloud/exn/TestReceiver.groovy +++ /dev/null @@ -1,59 +0,0 @@ -import eu.nebulouscloud.exn.Connector -import eu.nebulouscloud.exn.core.Consumer -import eu.nebulouscloud.exn.core.Context -import eu.nebulouscloud.exn.core.Handler -import eu.nebulouscloud.exn.handlers.ConnectorHandler -import eu.nebulouscloud.exn.settings.StaticExnConfig -import org.apache.qpid.protonj2.client.Message -import org.apache.qpid.protonj2.client.exceptions.ClientException - - -import java.util.concurrent.atomic.AtomicReference - - -class MyConnectorHandler extends ConnectorHandler { - @Override - def void onReady(AtomicReference context) { - println ("Ready start working") - } -} - -class MyCustomConsumerHandler extends Handler{ - @Override - def void onMessage(String key, String address, Map body, Message message, AtomicReference context) { - println "Received by custom handler ${key} => ${address} = ${body}" - } -} - - -public static void main(String[] args) { - try { - - Connector c = new Connector( - "ui", - new MyConnectorHandler(), - [], - [ - new Consumer("ui_health","health", new MyCustomConsumerHandler(), true), - new Consumer("ui_all","eu.nebulouscloud.ui.preferences.>", new Handler(){ - @Override - def void onMessage(String key, String address, Map body, Message rawMessage, AtomicReference context) { - if(key == "ui_all"){ - println "These are my preferences => ${body}" - } - } - },true,true), - ], - new StaticExnConfig( - 'localhost', - 5672, - "admin", - "admin" - ) - ) - c.start() - } catch (ClientException e) { - e.printStackTrace(); - } -} - diff --git a/exn-connector/src/main/examples/eu/nebulouscloud/exn/TestReceiver.java b/exn-connector/src/main/examples/eu/nebulouscloud/exn/TestReceiver.java new file mode 100644 index 0000000..16c592b --- /dev/null +++ b/exn-connector/src/main/examples/eu/nebulouscloud/exn/TestReceiver.java @@ -0,0 +1,107 @@ +package eu.nebulouscloud.exn; + + +import eu.nebulouscloud.exn.core.Consumer; +import eu.nebulouscloud.exn.core.Context; +import eu.nebulouscloud.exn.core.Handler; +import eu.nebulouscloud.exn.handlers.ConnectorHandler; +import eu.nebulouscloud.exn.settings.StaticExnConfig; +import org.apache.qpid.protonj2.client.Message; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; +import java.util.Objects; + +class MyCustomConsumerHandler extends Handler{ + Logger logger = LoggerFactory.getLogger(MyCustomConsumerHandler.class); + @Override + public void onMessage(String key, String address, Map body, Message message, Context context) { + logger.info("Received by custom handler {} => {} = {}", key,address,String.valueOf(body)); + } +} + +class MyConnectorHandler extends ConnectorHandler { + + Logger logger = LoggerFactory.getLogger(MyConnectorHandler.class); + + @Override + public void onReady(Context context) { + logger.info ("Ready start working"); + context.registerConsumer(new Consumer("ui_health","health", new MyCustomConsumerHandler(), true)); + + + /** + * We can then de-register the consumer + */ + new Thread(){ + @Override + public void run() { + + try { + logger.debug("Waiting for 50 s to unregister consumer"); + Thread.sleep(30000); + context.unregisterConsumer("ui_health"); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }.start(); + + + } +} + + +class TestReceiver{ + + + public static void main(String[] args) { + try { + Connector c = new Connector( + "ui", + new MyConnectorHandler(), + List.of(), + List.of( + new Consumer("ui_all","eu.nebulouscloud.ui.preferences.>", new Handler(){ + @Override + public void onMessage(String key, String address, Map body, Message rawMessage, Context context) { + if(Objects.equals(key, "ui_all")){ + System.out.println("These are my preferences => "+ String.valueOf(body)); + } + } + },true,true), + new Consumer("config_one","config", new Handler(){ + @Override + public void onMessage(String key, String address, Map body, Message rawMessage, Context context) { + System.out.println("These are my ONE config => "+ String.valueOf(body)); + } + },"one", true), + new Consumer("config_two","config", new Handler(){ + @Override + public void onMessage(String key, String address, Map body, Message rawMessage, Context context) { + + System.out.println("These are my TWO config => "+ String.valueOf(body)); + } + },"two", true) + + ), + false, + false, + new StaticExnConfig( + "localhost", + 5672, + "admin", + "admin" + ) + ); + c.start(); + + } catch (Exception e) { + e.printStackTrace(); + } + } +} + + diff --git a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/Connector.groovy b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/Connector.groovy index fefdf72..8be3fe4 100644 --- a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/Connector.groovy +++ b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/Connector.groovy @@ -4,30 +4,53 @@ import eu.nebulouscloud.exn.core.* import eu.nebulouscloud.exn.handlers.ConnectorHandler import eu.nebulouscloud.exn.settings.ExnConfig import org.aeonbits.owner.ConfigFactory -import org.apache.qpid.protonj2.client.* +import org.apache.qpid.protonj2.client.Client +import org.apache.qpid.protonj2.client.Connection +import org.apache.qpid.protonj2.client.ConnectionOptions import org.apache.qpid.protonj2.client.exceptions.ClientException import org.slf4j.Logger import org.slf4j.LoggerFactory -import java.util.concurrent.ExecutorService import java.util.concurrent.Executors -import java.util.concurrent.atomic.AtomicBoolean -import java.util.concurrent.atomic.AtomicReference +import java.util.concurrent.TimeUnit + +/** + * This is the connector class of the EXNConnector. Through this + * class you connect to the broker, defined the default consumers + * and publishers. + * + * This abstract all the boiler plate required. + * + */ public class Connector { - private static final Logger logger = LoggerFactory.getLogger(Connector.class) + private static final Logger logger = LoggerFactory.getLogger(Connector.class) private final String component - private final Publisher[] publishers - private final Consumer[] consumers - private final ExecutorService executorService - private Connection connection private ExnConfig config - private final AtomicBoolean running - private final AtomicReference context - private final AtomicReference handler + private final Context context + private Connection connection + private Manager manager + /** + * + * @param component This is the name of your component. It will be used in the + * AMPQ addresses generated when FQDN is false. If the {@link ExnConfig#baseName()} + * is 'eu.nebulouscloud' and your component name is 'ui', and your {@link Link#address} has + * a value of 'config', then the AMQP address will be 'eu.nebulouscloud.ui' + * @param handler This is the {@link ConnectorHandler} which will be called once the initialization process + * is complete + * @param publishers A list of publisher which will be ready upon initialization and added to the + * {@link Context} automatically + * @param consumers A list of consumers which will be ready upon initialization and added to the + * {@link Context} automatically + * @param enableState This will enable the default {@link StatePublisher} + * @param enableHealth This will enable the default {@link SchedulePublisher} which will ping + * the components health at {@link ExnConfig#healthTimeout()} + * + * @param configuration An optional {@link eu.nebulouscloud.exn.settings.StaticExnConfig} instance + */ public Connector( String component, ConnectorHandler handler, @@ -40,22 +63,16 @@ public class Connector { assert component this.component = component - this.consumers = consumers - this.running = new AtomicBoolean(true); - this.handler = new AtomicReference<>(handler) this.config = ConfigFactory.create(ExnConfig.class) - if (configuration == null ){ configuration = ConfigFactory.create(ExnConfig.class) } - this.config = configuration - this.context = new AtomicReference<>( - new Context( - "${configuration .url()}:${configuration .port()}", - "${configuration .baseName()}.${this.component}" + this.context = new Context( + "${configuration.url()}:${configuration.port()}", + "${configuration.baseName()}.${this.component}", + handler ) - ) List compiledPublishers = new ArrayList<>() if (enableState) { @@ -76,111 +93,61 @@ public class Connector { ) } compiledPublishers.addAll(publishers) - this.publishers = compiledPublishers - this.executorService = Executors.newCachedThreadPool(); - - } - private void startQueueListener(Consumer consumer) { - executorService.submit(new Runnable() { - @Override - void run() { - String address = context.get().buildAddressFromLink(consumer) - try { - Session session = connection.openSession().openFuture().get(); - Receiver receiver = session.openReceiver(address).openFuture().get(); - consumer.setLink(address,receiver) - while (running.get()) { - Delivery delivery = receiver.receive(); - if (delivery != null) { - consumer.onDelivery(delivery, context) - } - } - receiver.close(); - session.close(); - } catch (ClientException e) { - logger.error("Client exception for {} ",address,e) - } catch (Exception e){ - logger.error("General exception for {} ",address,e) - } - } - }); - } + for( Consumer c : consumers){ - - public void stop() { - try { - running.set(false) - connection.close() - executorService.shutdown() - logger.info("Connector stopped gracefully.") - } catch (ClientException e) { - logger.error("Error stopping connector ", e) + this.context.registerConsumer(c) } + for( Publisher p : compiledPublishers){ + + this.context.registerPublisher(p) + } + + } - public start() { + /** + * Stop everything + */ + public void stop() { + this.context.stop() + + + def executor = Executors.newSingleThreadScheduledExecutor() + executor + .schedule(new Runnable() { + @Override + void run() { + try { + connection.close() + logger.info("Connector stopped gracefully.") + } catch (ClientException e) { + } + + executor.shutdown() + } + },10, TimeUnit.SECONDS) + + } + + /** + * Start everythin + */ + public void start() { logger.info("Starting connector...") try { + final Client client = Client.create(); final ConnectionOptions connectionOpts = new ConnectionOptions(); connectionOpts.user(config.username()); connectionOpts.password(config.password()); connectionOpts.reconnectEnabled(true); + this.connection = client.connect(config.url(), config.port(), connectionOpts) - for (Publisher p : publishers) { - - String address = this.context.get().buildAddressFromLink(p) - p.setLink(address,connection.openSender(address)) - logger.debug("Registering publisher {}", p) - this.context.get().registerPublisher(p) - - if (p instanceof SchedulePublisher){ - logger.debug("Adding scheduled publisher as scheduled publisher {}", p) - final Publisher threadPublisher = p; - this.executorService.submit( - new Runnable() { - @Override - void run() { - boolean healthy = true - while(healthy && running.get()){ - try{ - logger.debug("Processing scheduled executor [{}] {} ", threadPublisher.key, address) - threadPublisher.send() - logger.debug("\t waiting for {} = {} ",address, threadPublisher.delay) - Thread.sleep(threadPublisher.delay*1000) - }catch (Exception e){ - logger.error("Error processing scheduled executor [{}] - disabling", threadPublisher.key,e) - healthy=false - } - } - } - } - ) - } - - - } - - for (Consumer c : consumers) { - logger.debug("Registering consumer {}", c) - this.context.get().registerConsumers(c) - this.startQueueListener(c) - } - - this.executorService.submit(new Runnable() { - @Override - void run() { - while (running.get()){ - Thread.sleep(1000) - } - } - }) - - this.handler.get().setReady(this.context) - + this.manager = new Manager(this.connection) + this.context.setManager(manager) } catch (Exception e) { logger.error("Error starting connector", e) diff --git a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Consumer.groovy b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Consumer.groovy index 5dc0f6d..4f16ac7 100644 --- a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Consumer.groovy +++ b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Consumer.groovy @@ -1,25 +1,75 @@ package eu.nebulouscloud.exn.core -import eu.nebulouscloud.exn.handlers.ConnectorHandler + import org.apache.qpid.protonj2.client.Delivery import org.apache.qpid.protonj2.client.Message import org.apache.qpid.protonj2.client.Receiver import org.slf4j.Logger import org.slf4j.LoggerFactory -import java.util.concurrent.atomic.AtomicReference - +/** + * This is the core consumer class which abstract the logic to + * receive the event. + * + * + * Using this class you define the AMQP address for which you wish + * to receive messages. + * + * Once a message is received this can then be handled by a {@link Handler} + * instance + * + */ class Consumer extends Link{ + private static final Logger logger = LoggerFactory.getLogger(Consumer.class) private Handler handler + private String application - Consumer(String key, String address, Handler handler, boolean topic=true, boolean FQDN=false) { + /** + * + * @param key This is unique identifier of the Consumer. + * @param address This is the AMQP address which will be appended to the + * {@link eu.nebulouscloud.exn.settings.ExnConfig#baseName()} for example + * if the base name is "foo", and the component is "bar" and the address is "hello" + * the AMQP address will be compiled as "foo.bar.hello" + * + * @param handler This is {@link Handler} class which you will use the process the message + * @param application This is an optional key to filter messages for a specific application + * @param topic A boolean parameter defining wether the address relates to a topic of a queue + * if it is a topic then "topic://" will be pre-appended to the address so the + * result will be "topic://foo.bar.hello" + * @param FQDN - If you wish to ignore the {@link eu.nebulouscloud.exn.settings.ExnConfig#baseName()} + * and subscribe to an arbitrary address, then set this to true, and you are + * responsible for writing the fully qualified address for the {@link #address} + * parameter + */ + Consumer(String key, String address, Handler handler, String application, boolean topic=true, boolean FQDN=false) { super(key, address, topic, FQDN) this.handler = handler + this.application = application } - public onDelivery(Delivery delivery, AtomicReference context){ + + Consumer(String key, String address, Handler handler, boolean topic=true, boolean FQDN=false) { + this(key, address, handler, null, topic,FQDN) + } + + public boolean hasApplication(){ + return this.application != null + } + + public String getAplication(){ + return this.application + } + + + protected Map processMessage(Message message, Context context){ + logger.debug("Processing message for{}",this.linkAddress) + return (Map)message.body() + } + + protected void onDelivery(Delivery delivery, Context context){ logger.debug("Default on delivery for delivery for {}",this.linkAddress) Message message = delivery.message(); @@ -31,12 +81,7 @@ class Consumer extends Link{ message, context ) - delivery.accept(); - } - - public Map processMessage(Message message, AtomicReference context){ - logger.debug("Processing message for{}",this.linkAddress) - return (Map)message.body() + delivery.accept() } } diff --git a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Context.groovy b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Context.groovy index 66f6595..fa87eaa 100644 --- a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Context.groovy +++ b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Context.groovy @@ -1,20 +1,92 @@ package eu.nebulouscloud.exn.core +import eu.nebulouscloud.exn.handlers.ConnectorHandler +import groovy.util.logging.Log +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +import java.util.concurrent.atomic.AtomicReference + +/** + * This class maintains the application states and is provided + * inside the event loop. + * + * It includes a set of utilities which are helpful during + * setup of your asynchronous application and after its + * initialization. + * + * Through this class you can register consumers, as + * well as publishers, once the event loop has been + * initiated + * + */ class Context { + Logger logger = LoggerFactory.getLogger(Context.class) + private final String uri private final String base private final Map publishers = [:] private final Map consumers = [:] + private final ConnectorHandler handler + private Manager manager - public Context(String uri, String base){ - this.base = base + public Context(String uri, String base, ConnectorHandler handler){ this.uri = uri + this.base = base + this.handler = handler } - def getPublisher(key) { - publishers[key] + Manager getManager() { + return manager + } + + /** + * + * This method is called when the context is started, + * so it is a good location to initialize the consumers + * already registered. + * @param manager + */ + public void setManager(Manager manager) { + + this.manager = manager + + + logger.info("Registering {} consumers", this.consumers.size()) + this.manager.start() + this.consumers.each({ + k,v -> { + + final Consumer c =v + this.manager.startConsumer(this, c) + + } + }) + + logger.info("Registering {} publishers", this.publishers.size()) + this.publishers.each({ + k,v -> { + + final Publisher p =v + this.manager.startPublisher(this, p) + + } + }) + + + this.handler.onReady(this) + + + } + + def Publisher getPublisher(key) { + publishers[key] as Publisher + } + + def Consumer getConsumer(key) { + consumers[key] as Consumer } boolean hasPublisher(key) { @@ -25,12 +97,53 @@ class Context { consumers.containsKey(key) } - void registerPublisher(publisher) { + void registerPublisher(Publisher publisher) { publishers[publisher.key()] = publisher + if(this.manager !=null && this.manager.getRunning()){ + final Publisher p =publisher + this.manager.startPublisher(this,p) + } } - void registerConsumers(consumer) { + void registerConsumer(Consumer consumer) { + logger.debug("Registering consumer {}=>{}",consumer.key(),consumer.address()) consumers[consumer.key()] = consumer + if(this.manager !=null && this.manager.getRunning()){ + final Consumer c = consumer + this.manager.startConsumer(this,c) + } + } + + void unregisterConsumer(String key){ + logger.debug("Un-Registering consumer {}",key) + if(consumers.containsKey(key)){ + Consumer c = consumers.get(key) + c.active=false + consumers.remove(key) + } + } + + void unregisterPublisher(String key){ + if(publishers.containsKey(key)){ + publishers.remove(key) + } + } + + + void stop(){ + + publishers.each {p -> { + p.setActive(false) + p.link.close() + }} + + consumers.each {p -> { + p.setActive(false) + p.link.close() + }} + + manager.stop() + } String buildAddressFromLink(Link link) { @@ -41,23 +154,5 @@ class Context { address } - boolean matchAddress(Link link, event) { - if (!event || !event.message || !event.message.address) { - return false - } - String address = buildAddressFromLink(link) - address == event.message.address - } - - String buildAddress(String[] actions, boolean topic = false) { - if (actions.length <= 0) { - return base - } - String address = "${base}.${actions.join('.')}" - if (topic) { - address = "topic://${address}" - } - address - } } diff --git a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Handler.groovy b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Handler.groovy index a47d2bb..b55d0c2 100644 --- a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Handler.groovy +++ b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Handler.groovy @@ -6,10 +6,24 @@ import org.slf4j.LoggerFactory import java.util.concurrent.atomic.AtomicReference +/** + * This is the handler class for the {@link Consumer}. You will need to + * create extension of this class in order to handle incoming messages. + */ abstract class Handler { private static final Logger logger = LoggerFactory.getLogger(Consumer.class) - public void onMessage(String key, String address, Map body, Message message, AtomicReference context){ + /** + * This is the default handle method, which needs to be overwritten if we + * need to handle + * @param key + * @param address + * @param body + * @param message + * @param context + */ + + public void onMessage(String key, String address, Map body, Message message, Context context){ logger.debug("Default on message for delivery for {} => {} ({}) = {}", key, address, diff --git a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Link.groovy b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Link.groovy index 7c75590..a50cc77 100644 --- a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Link.groovy +++ b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Link.groovy @@ -1,5 +1,11 @@ package eu.nebulouscloud.exn.core +/** + * This is a base class which abstract the Proton client Link + * code, and provides a basis for the {@link Publisher} and + * {@link Consumer} classes + * @param + */ abstract class Link>{ protected String key @@ -8,6 +14,8 @@ abstract class Link>{ public boolean topic public boolean fqdn = false public org.apache.qpid.protonj2.client.Link link + private boolean active + public Link( String key, @@ -21,13 +29,16 @@ abstract class Link>{ this.topic = topic this.address = address this.key = key + this.active = true + } public String key(){ return this.key } + public String address(){ - return this.key + return this.address } public setLink(String address, org.apache.qpid.protonj2.client.Link link){ @@ -35,4 +46,14 @@ abstract class Link>{ this.linkAddress =address } + + + boolean getActive() { + return active + } + + void setActive(boolean active) { + this.active = active + + } } diff --git a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Manager.groovy b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Manager.groovy new file mode 100644 index 0000000..42a4a2b --- /dev/null +++ b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Manager.groovy @@ -0,0 +1,150 @@ +package eu.nebulouscloud.exn.core + +import org.apache.qpid.protonj2.client.Connection +import org.apache.qpid.protonj2.client.Delivery +import org.apache.qpid.protonj2.client.Receiver +import org.apache.qpid.protonj2.client.Session +import org.apache.qpid.protonj2.client.exceptions.ClientException +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors +import java.util.concurrent.atomic.AtomicBoolean + + +/** + * This is the thread and connection handling manager. + * + * This class is instantiated during the bootstrap process + * and it abstract the logic of maintaining separate threads + * per {@link Consumer} and {@link SchedulePublisher} + * + * you do not need to instantiated this class. An instance + * of this class is available in the {@link Context} + */ +class Manager { + + private Logger logger = LoggerFactory.getLogger(Manager.class) + private final ExecutorService executorService + private final AtomicBoolean running + private Connection connection + + public Manager(Connection connection){ + this.connection = connection + this.executorService = Executors.newCachedThreadPool(); + this.running = new AtomicBoolean(false); + } + + protected boolean getRunning() { + return running.get() + } + + public stop(){ + this.running.set(false) + executorService.shutdown() + } + + public start(){ + this.running.set(true) + this.executorService.submit(new Runnable() { + @Override + void run() { + while (running){ + Thread.sleep(1000) + } + logger.info("Closing") + } + }) + } + + /** + * This is managed by the context, whose access is controlled by an atomic + * reference. Should be thread safe + * + * @param context + * @param consumers + */ + protected void startPublisher(Context context, Publisher publisher) { + logger.debug("Registering publisher {}", publisher) + String address = context.buildAddressFromLink(publisher) + publisher.setLink(address,this.connection.openSender(address)) + + if (publisher instanceof SchedulePublisher){ + logger.debug("Adding scheduled publisher as scheduled publisher {}", publisher) + this.executorService.submit( + new Runnable() { + @Override + void run() { + boolean healthy = true + while(healthy && running){ + try{ + logger.debug("Processing scheduled executor [{}] {} ", publisher.key, address) + publisher.send() + logger.debug("\t waiting for {} = {} ",address, publisher.delay) + Thread.sleep(publisher.delay*1000) + }catch (Exception e){ + logger.error("Error processing scheduled executor [{}] - disabling", publisher.key,e) + healthy=false + } + } + } + } + + ) + } + } + + + /** + * This is managed by the context, whose access is controlled by an atomic + * reference. Should be thread safe + * + * @param context + * @param consumers + */ + protected void startConsumer(Context context, Consumer consumer) { + logger.debug("Starting consumer {} => {}", consumer.key(),consumer.address()) + executorService.submit(new Runnable() { + @Override + void run() { + String address = context.buildAddressFromLink(consumer) + try { + Session session = connection.openSession().openFuture().get(); + + Receiver receiver = session.openReceiver(address).openFuture().get(); + + logger.info("Linking consumer {}", address) + if (consumer.hasApplication()){ + logger.info("\t for application {}", consumer.getAplication()) + } + consumer.setLink(address,receiver) + while (running && consumer.getActive()) { + Delivery delivery = receiver.receive(); + logger.debug("received delivery {}", address) + if (delivery != null) { + if(consumer.hasApplication()){ + if(consumer.getAplication() == delivery.message().subject()){ + consumer.onDelivery(delivery, context) + } + }else{ + consumer.onDelivery(delivery, context) + + } + } + } + logger.info("Stopping consumer {}", address) + receiver.close(); + session.close(); + } catch (ClientException e) { + logger.error("Client exception for {} ",address,e) + } catch (Exception e){ + logger.error("General exception for {} ",address,e) + } + } + }); + + } + + +} diff --git a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Publisher.groovy b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Publisher.groovy index c4dcc47..50330a9 100644 --- a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Publisher.groovy +++ b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/Publisher.groovy @@ -10,23 +10,80 @@ import java.time.ZoneOffset import java.time.ZonedDateTime import java.time.format.DateTimeFormatter +/** + * This is the core publisher class which abstract the logic to + * publish events. + * + * + * Using this class you define the AMQP address for which you wish + * to publish messages. + * + * The class takes care of preparing the the {@link Message} including + * content-type, message payload, and serialization + * + */ + class Publisher extends Link { private static final Logger logger = LoggerFactory.getLogger(Publisher.class) + /** + * + * @param key This is unique identifier of the Publisher. + * @param address This is the AMQP address which will be appended to the + * {@link eu.nebulouscloud.exn.settings.ExnConfig#baseName()} for example + * if the base name is "foo", and the component is "bar" and the address is "hello" + * the AMQP address will be compiled as "foo.bar.hello" + * + * @param topic A boolean parameter defining wether the address relates to a topic of a queue + * if it is a topic then "topic://" will be pre-appended to the address so the + * result will be "topic://foo.bar.hello" + * @param FQDN - If you wish to ignore the {@link eu.nebulouscloud.exn.settings.ExnConfig#baseName()} + * and subscribe to an arbitrary address, then set this to true, and you are + * responsible for writing the fully qualified address for the {@link #address} + * parameter + */ Publisher(String key, String address, boolean Topic, boolean FQDN=false) { super(key, address, Topic, FQDN) } + /** + * This method send the body without filtering + * on a specific application. + * + * @param body + * @return + */ + public void send() { + send([:] as Map,'') + } + public send(Map body) { + send(body,'') + } + + /** + * Use this method to send a message using this + * publisher, filtering on the specific applications + * + * @param body This is the payload of the message + * @param application This is the application for which to send the message to + * @return + */ + + public send(Map body, String application) { logger.debug("{} Sending {}", this.address, body) if(body == null){ body = [] as Map } def message = this.prepareMessage(body) + if(application != null && application != ''){ + message.subject(application) + } Tracker tracker = this.link.send(message) tracker.awaitSettlement(); } + private Message> prepareMessage(Map body){ def toSend=[ @@ -35,6 +92,7 @@ class Publisher extends Link { toSend.putAll(body) Message> message = Message.create(toSend); + message.contentType("application/json") message.to(this.linkAddress) return message diff --git a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/SchedulePublisher.groovy b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/SchedulePublisher.groovy index 58f2c08..9026126 100644 --- a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/SchedulePublisher.groovy +++ b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/SchedulePublisher.groovy @@ -1,8 +1,31 @@ package eu.nebulouscloud.exn.core +/** + * This is an extension of the {@link Publisher} class + * which allows the user to automatically send message + * at scheduled intervals. + * + */ class SchedulePublisher extends Publisher{ private final int delay + /** + * + * @param delay The delay in seconds we wish to send the recurring messages + * @param key This is unique identifier of the Publisher. + * @param address This is the AMQP address which will be appended to the + * {@link eu.nebulouscloud.exn.settings.ExnConfig#baseName()} for example + * if the base name is "foo", and the component is "bar" and the address is "hello" + * the AMQP address will be compiled as "foo.bar.hello" + * + * @param topic A boolean parameter defining wether the address relates to a topic of a queue + * if it is a topic then "topic://" will be pre-appended to the address so the + * result will be "topic://foo.bar.hello" + * @param FQDN - If you wish to ignore the {@link eu.nebulouscloud.exn.settings.ExnConfig#baseName()} + * and subscribe to an arbitrary address, then set this to true, and you are + * responsible for writing the fully qualified address for the {@link #address} + * parameter + */ SchedulePublisher(Integer delay, String key, String address, boolean Topic, boolean FQDN) { super(key, address, Topic, FQDN) this.delay = delay diff --git a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/StatePublisher.groovy b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/StatePublisher.groovy index 5282d44..be8a56f 100644 --- a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/StatePublisher.groovy +++ b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/core/StatePublisher.groovy @@ -1,5 +1,17 @@ package eu.nebulouscloud.exn.core + + +/** + * This is an extension of the {@link Publisher} class + * which abstracts the definition to send component states, + * which are required by the NebulOuScomponentns. + * + * An instance of this class is created during the boostrap + * process and available using the `state` key in the + * {@link Context} + * + */ class StatePublisher extends Publisher{ StatePublisher() { super("state", "state", true, false) diff --git a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/handlers/ConnectorHandler.groovy b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/handlers/ConnectorHandler.groovy index cde5678..e4000e6 100644 --- a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/handlers/ConnectorHandler.groovy +++ b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/handlers/ConnectorHandler.groovy @@ -5,19 +5,22 @@ import org.apache.qpid.protonj2.client.Message import java.util.concurrent.atomic.AtomicReference +/** + * This is the main entry point once the application has started. + * + * Upon initialization and thread handling + * + * + */ abstract class ConnectorHandler { - boolean initialized=false - private AtomicReference context - public setReady(AtomicReference context){ - this.initialized = true - this.context = context - this.onReady(context) - } - - - public void onReady(AtomicReference context){ + /** + * This method is called once all initilization has + * completed and the {@link Context} has been instatiated. + * @param context + */ + public void onReady(Context context){ } diff --git a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/settings/ExnConfig.groovy b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/settings/ExnConfig.groovy index 304435c..5d49be4 100644 --- a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/settings/ExnConfig.groovy +++ b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/settings/ExnConfig.groovy @@ -5,7 +5,19 @@ import org.aeonbits.owner.Config.Key import org.aeonbits.owner.Config.Sources import org.aeonbits.owner.Config.DefaultValue - +/** + * + * This class provides the configuration requirements + * for connecting to the broker and instantiating the + * connector. + * + * These properties can be read either + * + * A file in the root path of the application `exn.properties` + * A classpath resource `exn.properties` + * Java -D properties + * Environment variables + */ @Sources([ "file:./exn.properties", "classpath:exn.properties", @@ -14,23 +26,48 @@ import org.aeonbits.owner.Config.DefaultValue ]) public interface ExnConfig extends Config { + /** + * This is the base name of the addresses which + * will be generated by the consumers and producers + * @return + */ @Key("exn.basename") @DefaultValue("eu.nebulouscloud") String baseName() + /** + * This is the default interval of the Health publisher + * @return + */ @Key("exn.health.timeout") @DefaultValue("15") Integer healthTimeout() + /** + * This is the url of the broker + * @return + */ @Key("broker.url") String url() + /** + * This is the port of the broker + * @return + */ @Key("broker.port") int port(); + /** + * This is the username required to log into + * @return + */ @Key("broker.username") String username() + /** + * This is the password required to log into + * @return + */ @Key("broker.password") String password() } diff --git a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/settings/StaticExnConfig.groovy b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/settings/StaticExnConfig.groovy index 879ba2b..061b28c 100644 --- a/exn-connector/src/main/groovy/eu/nebulouscloud/exn/settings/StaticExnConfig.groovy +++ b/exn-connector/src/main/groovy/eu/nebulouscloud/exn/settings/StaticExnConfig.groovy @@ -5,12 +5,14 @@ import org.aeonbits.owner.Config.DefaultValue import org.aeonbits.owner.Config.Key import org.aeonbits.owner.Config.Sources -@Sources([ - "file:./exn.properties", - "classpath:exn.properties", - "system:properties", - "system:env" -]) +/** + * This class extends {@link ExnConfig} and allows you + * to explicitly and statically define the configuration + * properties, in order to handle the configuration + * of your component in the way you choose. + * + * + */ public class StaticExnConfig implements ExnConfig { private final String baseName @@ -26,7 +28,7 @@ public class StaticExnConfig implements ExnConfig { String username, String password, Integer healthTimeout=15, - String baseName='eu.nebulous' + String baseName='eu.nebulouscloud' ){ this.url = url diff --git a/exn-connector/src/test/groovy/eu/nebulouscloud/exn/core/ContextTest.groovy b/exn-connector/src/test/groovy/eu/nebulouscloud/exn/core/ContextTest.groovy new file mode 100644 index 0000000..ee19c7a --- /dev/null +++ b/exn-connector/src/test/groovy/eu/nebulouscloud/exn/core/ContextTest.groovy @@ -0,0 +1,105 @@ +package eu.nebulouscloud.exn.core + +import eu.nebulouscloud.exn.handlers.ConnectorHandler +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test + +public class ContextTest { + + def Context c + + @BeforeEach + public void initContext(){ + c = new Context("uri","base",new ConnectorHandler() { + @Override + void onReady(Context context) { + + } + }) + } + + @Test + public void testPublishersRegistration(){ + + def publisher = new Publisher("test","address",true) + c.registerPublisher(publisher) + + Assertions.assertTrue(c.hasPublisher("test")) + Assertions.assertEquals(publisher,c.getPublisher("test")) + } + + + @Test + public void testConsumersRegistration(){ + + def consumer = new Consumer("test","address",new Handler(){}) + c.registerConsumer(consumer) + + Assertions.assertTrue(c.hasConsumer("test")) + Assertions.assertEquals(consumer,c.getConsumer("test")) + } + + + + @Test + public void testBuildAddressFromLink(){ + + + def consumer = new Consumer("test","address",new Handler(){}) + Assertions.assertEquals(c.buildAddressFromLink(consumer), "topic://base.address"); + + consumer = new Consumer("test","address",new Handler(){},false) + Assertions.assertEquals(c.buildAddressFromLink(consumer), "base.address"); + + consumer = new Consumer("test","address",new Handler(){},false,true) + Assertions.assertEquals(c.buildAddressFromLink(consumer), "address"); + + consumer = new Consumer("test","address",new Handler(){},true,true) + Assertions.assertEquals(c.buildAddressFromLink(consumer), "topic://address"); + + + def publisher = new Publisher("test","address",true) + Assertions.assertEquals(c.buildAddressFromLink(publisher), "topic://base.address"); + + publisher = new Publisher("test","address",false) + Assertions.assertEquals(c.buildAddressFromLink(publisher), "base.address"); + + publisher = new Publisher("test","address",false,true) + Assertions.assertEquals(c.buildAddressFromLink(publisher), "address"); + + publisher = new Publisher("test","address",true,true) + Assertions.assertEquals(c.buildAddressFromLink(publisher), "topic://address"); + + } + + @Test + public void testMatchAddress(){ + + + def consumer = new Consumer("test","address",new Handler(){},false) + Assertions.assertEquals(c.buildAddressFromLink(consumer), "base.address"); + + consumer = new Consumer("test","address",new Handler(){},false,true) + Assertions.assertEquals(c.buildAddressFromLink(consumer), "address"); + + consumer = new Consumer("test","address",new Handler(){},true,true) + Assertions.assertEquals(c.buildAddressFromLink(consumer), "topic://address"); + + + def publisher = new Publisher("test","address",true) + Assertions.assertEquals(c.buildAddressFromLink(publisher), "topic://base.address"); + + publisher = new Publisher("test","address",false) + Assertions.assertEquals(c.buildAddressFromLink(publisher), "base.address"); + + publisher = new Publisher("test","address",false,true) + Assertions.assertEquals(c.buildAddressFromLink(publisher), "address"); + + publisher = new Publisher("test","address",true,true) + Assertions.assertEquals(c.buildAddressFromLink(publisher), "topic://address"); + + } + + +}