diff --git a/example/test_exn_publisher.py b/example/test_exn_publisher.py index 37d900f..4370347 100644 --- a/example/test_exn_publisher.py +++ b/example/test_exn_publisher.py @@ -1,5 +1,15 @@ + +import sys + +from exn.core.publisher import Publisher +from exn.handler.connector_handler import ConnectorHandler + +sys.path.insert(0,'../exn') + import logging -import time + +from dotenv import load_dotenv +load_dotenv() from exn import connector, core @@ -7,7 +17,7 @@ logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %( logging.getLogger('exn.connector').setLevel(logging.DEBUG) -class MyHandler(connector.ConnectorHandler): +class Bootstrap(ConnectorHandler): def ready(self, context): if context.has_publisher('state'): @@ -19,33 +29,39 @@ class MyHandler(connector.ConnectorHandler): context.publishers['config'].send({ 'hello': 'world' - }) - context.publishers['preferences'].send() + },application="one") + + context.publishers['config'].send({ + 'good': 'bye' + },application="two") + + if context.has_publisher('preferences'): + context.publishers['preferences'].send() -class MyPublisher(core.publisher.Publisher): +class MyPublisher(Publisher): + def __init__(self): - super().__init__('preferences', 'preferences.changed', True) + super().__init__( 'preferences', 'preferences', topic=True) - def send(self, body={}): - body.update({ + def send(self): + super(MyPublisher, self).send({ "preferences": { "dark_mode": True } }) - super(MyPublisher, self).send(body) -connector = connector.EXN('ui', handler=MyHandler() +connector = connector.EXN('ui', handler=Bootstrap() , publishers=[ core.publisher.Publisher('config', 'config', True), - MyPublisher(), + MyPublisher() ], - enable_health=True, enable_state=False - ,url='localhost' - ,port=5672 - ,username="admin" - ,password="adming" - ) + enable_health=True, enable_state=True + ,url='localhost' + ,port=5672 + ,username="admin" + ,password="admin" + ) connector.start() diff --git a/example/test_exn_receiver.py b/example/test_exn_receiver.py index 6f703a1..8bfe25f 100644 --- a/example/test_exn_receiver.py +++ b/example/test_exn_receiver.py @@ -1,27 +1,64 @@ +import sys +sys.path.insert(0,'../exn') + + import logging -from exn import connector, core +from dotenv import load_dotenv +load_dotenv() + +from proton import Message +from exn.connector import EXN +from exn.core.consumer import Consumer +from exn.core.context import Context +from exn.core.handler import Handler +from exn.handler.connector_handler import ConnectorHandler logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logging.getLogger('exn.connector').setLevel(logging.DEBUG) -class Bootstrap(connector.ConnectorHandler): - - def on_message(self, key, address, body, context, **kwargs): - logging.info(f"Received {key} => {address}") - if key == 'ui_health': - logging.info(f"I am healthy => {body}") - - if key == 'ui_all': - logging.info(f"These are my preferences => {body}") +class MyGeneralHandler(Handler): + def on_message(self, key, address, body, message: Message, context=None): + logging.info(f"[MyGeneralHandler] Received {key} => {address}: {body}") -connector = connector.EXN('ui', handler=Bootstrap(), +class Bootstrap(ConnectorHandler): + context = None + + def ready(self, context: Context): + self.context = context + # do work here + + self.context.register_consumers( + Consumer('ui_health', 'health', handler=my_general_handler, topic=True) + ) + + +class MyConfigHandler(Handler): + def on_message(self, key, address, body, message: Message, context=None): + logging.info(f"[MyConfigHandler{self}] Received {key} => {address}: {body}") + + +my_general_handler = MyGeneralHandler() + +connector = EXN('ui', handler=Bootstrap(), consumers=[ - core.consumer.Consumer('ui_health', 'health', topic=True), - core.consumer.Consumer('ui_all', 'eu.nebulouscloud.ui.preferences.>', topic=True, - fqdn=True) + Consumer('ui_all', 'eu.nebulouscloud.ui.preferences.>', + handler=my_general_handler, + topic=True, + fqdn=True), + Consumer('config_two', 'config', + handler=MyConfigHandler(), + application="two", + topic=True, + ), + Consumer('config_one', 'config', + handler=MyConfigHandler(), + application="one", + topic=True, + ), + ]) connector.start() diff --git a/exn/__init__.py b/exn/__init__.py index 74c0c17..0b73db1 100644 --- a/exn/__init__.py +++ b/exn/__init__.py @@ -1 +1,6 @@ + + +from . import core +from . import handler +from . import settings from . import connector \ No newline at end of file diff --git a/exn/connector.py b/exn/connector.py index 8a62016..d0193c2 100644 --- a/exn/connector.py +++ b/exn/connector.py @@ -1,117 +1,41 @@ import logging import os -from dotenv import load_dotenv -from proton.handlers import MessagingHandler from proton.reactor import Container -from .core import context as core_context, state_publisher, schedule_publisher +from exn.core import state_publisher, schedule_publisher +from exn.core.context import Context +from .core.manager import Manager from .settings import base +from .handler import connector_handler logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') _logger = logging.getLogger(__name__) -class ConnectorHandler: - def __init__(self): - self.initialized = False - - - def set_ready(self,ready, ctx:core_context.Context): - self.initialized = ready - self.ready(ctx) - - def ready(self, ctx:core_context.Context): - pass - - def on_message(self, key, address, body, context, **kwargs): - pass - - -class CoreHandler(MessagingHandler): - - def __init__(self, - context, - handler: ConnectorHandler, - publishers = [], - consumers = [], - ): - super(CoreHandler, self).__init__() - self.context=context - self.publishers=publishers - self.consumers=consumers - self.handler = handler - self.conn = None - - def on_start(self, event) -> None: - - self.conn = event.container.connect(self.context.connection) - for publisher in self.publishers: - _logger.info(f"{publisher.address} registering sender") - address = self.context.build_address_from_link(publisher) - publisher.set(event.container.create_sender(self.conn,address)) - self.context.register_publisher(publisher) - _logger.debug(f"{self.context.base} Registering timer { hasattr(publisher, 'delay')}") - if hasattr(publisher, "delay"): - _logger.debug(f"{self.context.base} Registering timer") - event.reactor.schedule(publisher.delay, self) - - for consumer in self.consumers: - address = self.context.build_address_from_link(consumer) - _logger.info(f"{self.context.base} Registering consumer {address}") - consumer.set(event.container.create_receiver(self.conn, address)) - self.context.register_consumers(consumer) - - def on_sendable(self, event): - if not self.handler.initialized: - self.handler.set_ready(True, self.context) - - def on_timer_task(self, event): - _logger.debug(f"{self.context.base} On timer") - for publisher in self._delay_publishers(): - publisher.send() - event.reactor.schedule(publisher.delay, self) - - def on_message(self, event): - try: - for consumer in self.consumers: - if consumer.should_handle(event): - _logger.debug(f"Received message: {event.message.address}") - self.handler.on_message(consumer.key, event.message.address, event.message.body, self.context, event=event) - except Exception as e: - _logger.error(f"Received message: {e}") - - - def close(self): - if self.conn: - self.conn.close() - else: - _logger.warning(f"{self.context.base} No open connection") - - def _delay_publishers(self): - return [p for p in self.publishers if hasattr(p,'delay')] - class EXN: + + context = None + container = None + def __init__(self, component=None, - handler:ConnectorHandler = None, - publishers=[], - consumers=[], + handler:connector_handler.ConnectorHandler = None, + publishers=None, + consumers=None, **kwargs): # Load .env file - load_dotenv() - # Validate and set connector if not component: _logger.error("Component cannot be empty or None") raise ValueError("Component cannot be empty or None") self.component = component - self.handler = handler self.url = kwargs.get('url',os.getenv('NEBULOUS_BROKER_URL')) self.port = kwargs.get('port', os.getenv('NEBULOUS_BROKER_PORT')) self.username = kwargs.get('username',os.getenv('NEBULOUS_BROKER_USERNAME')) self.password = kwargs.get('password', os.getenv('NEBULOUS_BROKER_PASSWORD')) + self.handler = handler # Validate attributes if not self.url: @@ -127,29 +51,34 @@ class EXN: _logger.error("PASSWORD cannot be empty or None") raise ValueError("PASSWORD cannot be empty or None") - ctx = core_context.Context( - connection=f"{self.url}:{self.port}", - base=f"{base.NEBULOUS_BASE_NAME}.{self.component}", - ) + self.context = Context(base=f"{base.NEBULOUS_BASE_NAME}.{self.component}") + if not publishers: + publishers = [] + + if not consumers: + consumers = [] + + compiled_publishers = publishers if kwargs.get("enable_state",False): - publishers.append(state_publisher.Publisher()) + compiled_publishers.append(state_publisher.Publisher()) if kwargs.get("enable_health",False): - publishers.append(schedule_publisher.Publisher( + compiled_publishers.append(schedule_publisher.Publisher( base.NEBULOUS_DEFAULT_HEALTH_CHECK_TIMEOUT, 'health', 'health', - True)) + topic=True)) - core_handler = CoreHandler( - ctx, - handler, - publishers, - consumers - ) + for c in consumers: + self.context.register_consumers(c) - self.container = Container(core_handler) + for p in compiled_publishers: + self.context.register_publisher(p) def start(self): - self.container.run() + self.context.start(Manager(f"{self.url}:{self.port}"),self.handler) + + + def stop(self): + self.context.stop() diff --git a/exn/core/__init__.py b/exn/core/__init__.py index bdc524b..8fb3eb4 100644 --- a/exn/core/__init__.py +++ b/exn/core/__init__.py @@ -1,6 +1,7 @@ from . import context +from . import handler from . import publisher from . import consumer from . import state_publisher diff --git a/exn/core/consumer.py b/exn/core/consumer.py index e5c7424..a92a1c2 100644 --- a/exn/core/consumer.py +++ b/exn/core/consumer.py @@ -1,17 +1,43 @@ -import datetime - -from proton import Message, Event -from . import link import logging +from proton import Event +from .handler import Handler + +from . import link + +from proton.handlers import MessagingHandler + _logger = logging.getLogger(__name__) +_logger.setLevel(level=logging.DEBUG) -class Consumer(link.Link): +class Consumer(link.Link, MessagingHandler): + application = None - def on_message(self, body, **kwargs): - _logger.debug(f"{self.address} Got {body} ") + def __init__(self, key, address, handler: Handler, application=None, topic=False, fqdn=False): + super(Consumer, self).__init__(key, address, topic, fqdn) + self.application = application + self.handler = handler + self.handler._consumer = self def should_handle(self, event: Event): - if event.link == self._link: - return True + + should = event.link.name == self._link.name and \ + (self.application is None or event.message.subject == self.application) + + _logger.debug(f"[{self.key}] checking if link is the same {event.link.name}={self._link.name} " + f" and application {self.application}={event.message.subject} == {should}") + + return should + + def on_start(self, event: Event) -> None: + _logger.debug(f"[{self.key}] on_start") + + def on_message(self, event): + _logger.debug(f"[{self.key}] handling event with address => {event.message.address}") + try: + if self.should_handle(event): + self.handler.on_message(self.key, event.message.address, event.message.body, event.message) + + except Exception as e: + _logger.error(f"Received message: {e}") diff --git a/exn/core/context.py b/exn/core/context.py index 1b99056..db5cde6 100644 --- a/exn/core/context.py +++ b/exn/core/context.py @@ -1,14 +1,60 @@ -from . import link +import logging +from proton.reactor import Container + +from . import link +from .manager import Manager + + +_logger = logging.getLogger(__name__) +_logger.setLevel(logging.DEBUG) class Context: - def __init__(self, connection, base): + base = None + handler = None + publishers = {} + consumers = {} + _manager = None + + def __init__(self, base): - self.connection = connection self.base = base - self.publishers = {} - self.consumers = {} + + def start(self, manager:Manager, handler): + self._manager = manager + + def on_ready(): + _logger.debug("[context] on_ready" ) + for key,publisher in self.publishers.items(): + self._manager.start_publisher(self,publisher) + + for key,consumer in self.consumers.items(): + self._manager.start_consumer(self,consumer) + + handler.ready(context=self) + + self._manager._on_ready=on_ready + self._manager.start() + + def stop(self): + if self._manager is not None and self._manager.started: + for key,publisher in self.publishers: + publisher._link.close() + for key,consumer in self.consumers: + consumer._link.close() + + self._manager.close() + + + def register_publisher(self, publisher): + if publisher.key in self.publishers: + _logger.warning("[context] Trying to register publisher that already exists") + return + _logger.info(f"[context] registering publisher {publisher.key} {publisher.address}" ) + self.publishers[publisher.key] = publisher + if self._manager is not None and self._manager.started: + self._manager.start_publisher(self,publisher) def get_publisher(self, key): if key in self.publishers: @@ -21,11 +67,31 @@ class Context: def has_consumer(self, key): return key in self.consumers - def register_publisher(self, publisher): - self.publishers[publisher.key] = publisher - def register_consumers(self, consumer): + if consumer.key in self.consumers: + _logger.warning("[context] Trying to register consumer that already exists") + return + self.consumers[consumer.key] = consumer + if self._manager is not None and self._manager.started: + self._manager.start_consumer(self,consumer) + + def unregister_consumer(self, key): + if not key in self.consumers: + _logger.warning("[context] Trying to unregister consumer that does not exists") + return + + consumer = self.consumers.pop(key) + if self._manager is not None and self._manager.started: + consumer._link.close() + + def unregister_publisher(self, key): + if not key in self.consumers: + _logger.warning("[context] Trying to unregister publisher that does not exists") + return + publisher = self.publishers.pop(key) + if self._manager is not None and self._manager.started: + publisher._link.close() def build_address_from_link(self, link: link.Link): @@ -41,23 +107,3 @@ class Context: return address - def match_address(self, l: link.Link, event): - - if not event \ - or not event.message \ - or not event.message.address: - return False - - address = self.build_address_from_link(l) - return address == event.message.address - - def build_address(self, *actions, topic=False): - - if len(actions) <= 0: - return self.base - - address = f"{self.base}.{'.'.join(actions)}" - if topic: - address = f"topic://{address}" - - return address diff --git a/exn/core/handler.py b/exn/core/handler.py new file mode 100644 index 0000000..8fd1bfb --- /dev/null +++ b/exn/core/handler.py @@ -0,0 +1,11 @@ +import logging + +from proton import Message + +_logger = logging.getLogger(__name__) + + +class Handler: + + def on_message(self, key, address, body, message: Message, context=None): + _logger.info(f"You should really override this... {key}=>{address}") diff --git a/exn/core/link.py b/exn/core/link.py index 65d6395..bf91199 100644 --- a/exn/core/link.py +++ b/exn/core/link.py @@ -1,16 +1,18 @@ + from proton import Link as pLink + class Link: fqdn=False - def __init__(self, key, address, topic=False, fqdn=False): + def __init__(self, key, address, topic=False, fqdn=False): + super().__init__() self.key = key self.address = address - self._link = None self.topic= topic self.fqdn= fqdn - + self._link = None def set(self, link:pLink): # The proton container creates a sender diff --git a/exn/core/manager.py b/exn/core/manager.py new file mode 100644 index 0000000..dd4025e --- /dev/null +++ b/exn/core/manager.py @@ -0,0 +1,71 @@ +import logging + +from proton import Event, Connection,Session + +from proton.handlers import MessagingHandler +from proton.reactor import Container + +from .consumer import Consumer +from .publisher import Publisher + +_logger = logging.getLogger(__name__) +_logger.setLevel(logging.DEBUG) + + +class SessionPerConsumer(object): + def session(self, connection: Connection) -> Session: + session = connection.session() + session.open() + return session + + +class Manager(MessagingHandler): + uri = None + started = False + container = None + connection = None + + _on_ready = None + + def __init__(self, uri): + super(Manager, self).__init__() + self.uri = uri + + def start(self): + _logger.info(f"[manager] starting") + self.container = Container(self) + self.container.run() + + def on_start(self, event: Event) -> None: + self.connection = self.container.connect(self.uri) + self.connection._session_policy=SessionPerConsumer() + + self.started=True + _logger.debug(f"[manager] on_start") + if self._on_ready is not None: + self._on_ready() + + def on_message(self, event: Event) -> None: + _logger.warning(f"[manager] received generic on_message make sure you have set up your handlers" + f" properly ") + + def close(self): + _logger.info(f"[manager] closing") + if self.container: + self.container.stop() + + if self.connection: + self.connection.close() + + def start_publisher(self, context, publisher: Publisher): + address = context.build_address_from_link(publisher) + _logger.info(f"[manager] starting publisher {publisher.key} => {address}") + publisher.set(self.container.create_sender(self.connection, address)) + if hasattr(publisher, "delay"): + _logger.debug(f"{context.base} registering timer {hasattr(publisher, 'delay')}") + self.container.schedule(publisher.delay, handler=publisher) + + def start_consumer(self, context, consumer: Consumer): + address = context.build_address_from_link(consumer) + _logger.info(f"[manager] starting consumer {consumer.key} => {address}") + consumer.set(self.container.create_receiver(self.connection, address , handler=consumer)) diff --git a/exn/core/publisher.py b/exn/core/publisher.py index 2768c5d..e15ec6a 100644 --- a/exn/core/publisher.py +++ b/exn/core/publisher.py @@ -10,12 +10,15 @@ _logger = logging.getLogger(__name__) class Publisher(link.Link): - def send(self, body=None): + def send(self, body=None, application=None): if not body: body = {} - _logger.debug(f"{self.address} Sending {body} ") + _logger.info(f"[{self.key}] sending to {self._link.target.address} for application={application} - {body} ") msg = self._prepare_message(body) + if application: + msg.subject = application + self._link.send(msg) def _prepare_message(self, body=None): diff --git a/exn/core/schedule_publisher.py b/exn/core/schedule_publisher.py index 9cf8b64..f7dc10c 100644 --- a/exn/core/schedule_publisher.py +++ b/exn/core/schedule_publisher.py @@ -1,14 +1,24 @@ import logging -from . import publisher +from proton.handlers import MessagingHandler +from .publisher import Publisher _logger = logging.getLogger(__name__) -class Publisher(publisher.Publisher): +class Publisher(Publisher, MessagingHandler): send_next = False delay = 15 - def __init__(self, delay, key, address, topic=False): - super(Publisher, self).__init__(key, address, topic) + def __init__(self, delay, key, address, application=None, topic=False, fqdn=False): + super(Publisher, self).__init__(key, address, topic,fqdn) self.delay = delay + self.application = application + + def on_timer_task(self, event): + _logger.debug(f"[manager] on_timer_task") + self.send() + event.reactor.schedule(self.delay, self) + + def send(self, body=None, application=None): + super(Publisher, self).send(body, self.application) diff --git a/exn/core/state_publisher.py b/exn/core/state_publisher.py index 8ea6af7..f8ae1cb 100644 --- a/exn/core/state_publisher.py +++ b/exn/core/state_publisher.py @@ -27,19 +27,19 @@ class Publisher(publisher.Publisher): self.send({"state": message_type,"message": None}) def starting(self): - self._send_message(States.STARTING) + self._send_message(States.STARTING.value) def started(self): - self._send_message(States.STARTED) + self._send_message(States.STARTED.value) def ready(self): - self._send_message(States.READY) + self._send_message(States.READY.value) def stopping(self): - self._send_message(States.STOPPING) + self._send_message(States.STOPPING.value) def stopped(self): - self._send_message(States.STOPPED) + self._send_message(States.STOPPED.value) def custom(self, state): self._send_message(state) diff --git a/exn/handler/__init__.py b/exn/handler/__init__.py new file mode 100644 index 0000000..a7a404b --- /dev/null +++ b/exn/handler/__init__.py @@ -0,0 +1,2 @@ + +from . import connector_handler \ No newline at end of file diff --git a/exn/handler/connector_handler.py b/exn/handler/connector_handler.py new file mode 100644 index 0000000..dafe9d8 --- /dev/null +++ b/exn/handler/connector_handler.py @@ -0,0 +1,12 @@ + +import logging + + +_logger = logging.getLogger(__name__) + +class ConnectorHandler: + + def ready(self, context): + pass + +