Aligned python and java, dynamic consumers for python
Change-Id: I2b63a46431adc4adcb0a4fcbf12012f0077c3414
This commit is contained in:
parent
19bdfeea69
commit
2144911594
@ -1,5 +1,15 @@
|
|||||||
|
|
||||||
|
import sys
|
||||||
|
|
||||||
|
from exn.core.publisher import Publisher
|
||||||
|
from exn.handler.connector_handler import ConnectorHandler
|
||||||
|
|
||||||
|
sys.path.insert(0,'../exn')
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
import time
|
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
load_dotenv()
|
||||||
|
|
||||||
from exn import connector, core
|
from exn import connector, core
|
||||||
|
|
||||||
@ -7,7 +17,7 @@ logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(
|
|||||||
logging.getLogger('exn.connector').setLevel(logging.DEBUG)
|
logging.getLogger('exn.connector').setLevel(logging.DEBUG)
|
||||||
|
|
||||||
|
|
||||||
class MyHandler(connector.ConnectorHandler):
|
class Bootstrap(ConnectorHandler):
|
||||||
|
|
||||||
def ready(self, context):
|
def ready(self, context):
|
||||||
if context.has_publisher('state'):
|
if context.has_publisher('state'):
|
||||||
@ -19,33 +29,39 @@ class MyHandler(connector.ConnectorHandler):
|
|||||||
|
|
||||||
context.publishers['config'].send({
|
context.publishers['config'].send({
|
||||||
'hello': 'world'
|
'hello': 'world'
|
||||||
})
|
},application="one")
|
||||||
context.publishers['preferences'].send()
|
|
||||||
|
context.publishers['config'].send({
|
||||||
|
'good': 'bye'
|
||||||
|
},application="two")
|
||||||
|
|
||||||
|
if context.has_publisher('preferences'):
|
||||||
|
context.publishers['preferences'].send()
|
||||||
|
|
||||||
|
|
||||||
class MyPublisher(core.publisher.Publisher):
|
class MyPublisher(Publisher):
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
super().__init__('preferences', 'preferences.changed', True)
|
super().__init__( 'preferences', 'preferences', topic=True)
|
||||||
|
|
||||||
def send(self, body={}):
|
def send(self):
|
||||||
body.update({
|
super(MyPublisher, self).send({
|
||||||
"preferences": {
|
"preferences": {
|
||||||
"dark_mode": True
|
"dark_mode": True
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
super(MyPublisher, self).send(body)
|
|
||||||
|
|
||||||
|
|
||||||
connector = connector.EXN('ui', handler=MyHandler()
|
connector = connector.EXN('ui', handler=Bootstrap()
|
||||||
, publishers=[
|
, publishers=[
|
||||||
core.publisher.Publisher('config', 'config', True),
|
core.publisher.Publisher('config', 'config', True),
|
||||||
MyPublisher(),
|
MyPublisher()
|
||||||
],
|
],
|
||||||
enable_health=True, enable_state=False
|
enable_health=True, enable_state=True
|
||||||
,url='localhost'
|
,url='localhost'
|
||||||
,port=5672
|
,port=5672
|
||||||
,username="admin"
|
,username="admin"
|
||||||
,password="adming"
|
,password="admin"
|
||||||
)
|
)
|
||||||
|
|
||||||
connector.start()
|
connector.start()
|
||||||
|
@ -1,27 +1,64 @@
|
|||||||
|
import sys
|
||||||
|
sys.path.insert(0,'../exn')
|
||||||
|
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
from exn import connector, core
|
from dotenv import load_dotenv
|
||||||
|
load_dotenv()
|
||||||
|
|
||||||
|
from proton import Message
|
||||||
|
from exn.connector import EXN
|
||||||
|
from exn.core.consumer import Consumer
|
||||||
|
from exn.core.context import Context
|
||||||
|
from exn.core.handler import Handler
|
||||||
|
from exn.handler.connector_handler import ConnectorHandler
|
||||||
|
|
||||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||||
logging.getLogger('exn.connector').setLevel(logging.DEBUG)
|
logging.getLogger('exn.connector').setLevel(logging.DEBUG)
|
||||||
|
|
||||||
|
|
||||||
class Bootstrap(connector.ConnectorHandler):
|
class MyGeneralHandler(Handler):
|
||||||
|
def on_message(self, key, address, body, message: Message, context=None):
|
||||||
def on_message(self, key, address, body, context, **kwargs):
|
logging.info(f"[MyGeneralHandler] Received {key} => {address}: {body}")
|
||||||
logging.info(f"Received {key} => {address}")
|
|
||||||
if key == 'ui_health':
|
|
||||||
logging.info(f"I am healthy => {body}")
|
|
||||||
|
|
||||||
if key == 'ui_all':
|
|
||||||
logging.info(f"These are my preferences => {body}")
|
|
||||||
|
|
||||||
|
|
||||||
connector = connector.EXN('ui', handler=Bootstrap(),
|
class Bootstrap(ConnectorHandler):
|
||||||
|
context = None
|
||||||
|
|
||||||
|
def ready(self, context: Context):
|
||||||
|
self.context = context
|
||||||
|
# do work here
|
||||||
|
|
||||||
|
self.context.register_consumers(
|
||||||
|
Consumer('ui_health', 'health', handler=my_general_handler, topic=True)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class MyConfigHandler(Handler):
|
||||||
|
def on_message(self, key, address, body, message: Message, context=None):
|
||||||
|
logging.info(f"[MyConfigHandler{self}] Received {key} => {address}: {body}")
|
||||||
|
|
||||||
|
|
||||||
|
my_general_handler = MyGeneralHandler()
|
||||||
|
|
||||||
|
connector = EXN('ui', handler=Bootstrap(),
|
||||||
consumers=[
|
consumers=[
|
||||||
core.consumer.Consumer('ui_health', 'health', topic=True),
|
Consumer('ui_all', 'eu.nebulouscloud.ui.preferences.>',
|
||||||
core.consumer.Consumer('ui_all', 'eu.nebulouscloud.ui.preferences.>', topic=True,
|
handler=my_general_handler,
|
||||||
fqdn=True)
|
topic=True,
|
||||||
|
fqdn=True),
|
||||||
|
Consumer('config_two', 'config',
|
||||||
|
handler=MyConfigHandler(),
|
||||||
|
application="two",
|
||||||
|
topic=True,
|
||||||
|
),
|
||||||
|
Consumer('config_one', 'config',
|
||||||
|
handler=MyConfigHandler(),
|
||||||
|
application="one",
|
||||||
|
topic=True,
|
||||||
|
),
|
||||||
|
|
||||||
])
|
])
|
||||||
|
|
||||||
connector.start()
|
connector.start()
|
||||||
|
@ -1 +1,6 @@
|
|||||||
|
|
||||||
|
|
||||||
|
from . import core
|
||||||
|
from . import handler
|
||||||
|
from . import settings
|
||||||
from . import connector
|
from . import connector
|
135
exn/connector.py
135
exn/connector.py
@ -1,117 +1,41 @@
|
|||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
|
|
||||||
from dotenv import load_dotenv
|
|
||||||
from proton.handlers import MessagingHandler
|
|
||||||
from proton.reactor import Container
|
from proton.reactor import Container
|
||||||
|
|
||||||
from .core import context as core_context, state_publisher, schedule_publisher
|
from exn.core import state_publisher, schedule_publisher
|
||||||
|
from exn.core.context import Context
|
||||||
|
from .core.manager import Manager
|
||||||
from .settings import base
|
from .settings import base
|
||||||
|
from .handler import connector_handler
|
||||||
|
|
||||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||||
_logger = logging.getLogger(__name__)
|
_logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
class ConnectorHandler:
|
|
||||||
def __init__(self):
|
|
||||||
self.initialized = False
|
|
||||||
|
|
||||||
|
|
||||||
def set_ready(self,ready, ctx:core_context.Context):
|
|
||||||
self.initialized = ready
|
|
||||||
self.ready(ctx)
|
|
||||||
|
|
||||||
def ready(self, ctx:core_context.Context):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def on_message(self, key, address, body, context, **kwargs):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class CoreHandler(MessagingHandler):
|
|
||||||
|
|
||||||
def __init__(self,
|
|
||||||
context,
|
|
||||||
handler: ConnectorHandler,
|
|
||||||
publishers = [],
|
|
||||||
consumers = [],
|
|
||||||
):
|
|
||||||
super(CoreHandler, self).__init__()
|
|
||||||
self.context=context
|
|
||||||
self.publishers=publishers
|
|
||||||
self.consumers=consumers
|
|
||||||
self.handler = handler
|
|
||||||
self.conn = None
|
|
||||||
|
|
||||||
def on_start(self, event) -> None:
|
|
||||||
|
|
||||||
self.conn = event.container.connect(self.context.connection)
|
|
||||||
for publisher in self.publishers:
|
|
||||||
_logger.info(f"{publisher.address} registering sender")
|
|
||||||
address = self.context.build_address_from_link(publisher)
|
|
||||||
publisher.set(event.container.create_sender(self.conn,address))
|
|
||||||
self.context.register_publisher(publisher)
|
|
||||||
_logger.debug(f"{self.context.base} Registering timer { hasattr(publisher, 'delay')}")
|
|
||||||
if hasattr(publisher, "delay"):
|
|
||||||
_logger.debug(f"{self.context.base} Registering timer")
|
|
||||||
event.reactor.schedule(publisher.delay, self)
|
|
||||||
|
|
||||||
for consumer in self.consumers:
|
|
||||||
address = self.context.build_address_from_link(consumer)
|
|
||||||
_logger.info(f"{self.context.base} Registering consumer {address}")
|
|
||||||
consumer.set(event.container.create_receiver(self.conn, address))
|
|
||||||
self.context.register_consumers(consumer)
|
|
||||||
|
|
||||||
def on_sendable(self, event):
|
|
||||||
if not self.handler.initialized:
|
|
||||||
self.handler.set_ready(True, self.context)
|
|
||||||
|
|
||||||
def on_timer_task(self, event):
|
|
||||||
_logger.debug(f"{self.context.base} On timer")
|
|
||||||
for publisher in self._delay_publishers():
|
|
||||||
publisher.send()
|
|
||||||
event.reactor.schedule(publisher.delay, self)
|
|
||||||
|
|
||||||
def on_message(self, event):
|
|
||||||
try:
|
|
||||||
for consumer in self.consumers:
|
|
||||||
if consumer.should_handle(event):
|
|
||||||
_logger.debug(f"Received message: {event.message.address}")
|
|
||||||
self.handler.on_message(consumer.key, event.message.address, event.message.body, self.context, event=event)
|
|
||||||
except Exception as e:
|
|
||||||
_logger.error(f"Received message: {e}")
|
|
||||||
|
|
||||||
|
|
||||||
def close(self):
|
|
||||||
if self.conn:
|
|
||||||
self.conn.close()
|
|
||||||
else:
|
|
||||||
_logger.warning(f"{self.context.base} No open connection")
|
|
||||||
|
|
||||||
def _delay_publishers(self):
|
|
||||||
return [p for p in self.publishers if hasattr(p,'delay')]
|
|
||||||
|
|
||||||
|
|
||||||
class EXN:
|
class EXN:
|
||||||
|
|
||||||
|
context = None
|
||||||
|
container = None
|
||||||
|
|
||||||
def __init__(self, component=None,
|
def __init__(self, component=None,
|
||||||
handler:ConnectorHandler = None,
|
handler:connector_handler.ConnectorHandler = None,
|
||||||
publishers=[],
|
publishers=None,
|
||||||
consumers=[],
|
consumers=None,
|
||||||
**kwargs):
|
**kwargs):
|
||||||
|
|
||||||
# Load .env file
|
# Load .env file
|
||||||
load_dotenv()
|
|
||||||
|
|
||||||
# Validate and set connector
|
# Validate and set connector
|
||||||
if not component:
|
if not component:
|
||||||
_logger.error("Component cannot be empty or None")
|
_logger.error("Component cannot be empty or None")
|
||||||
raise ValueError("Component cannot be empty or None")
|
raise ValueError("Component cannot be empty or None")
|
||||||
self.component = component
|
self.component = component
|
||||||
self.handler = handler
|
|
||||||
|
|
||||||
self.url = kwargs.get('url',os.getenv('NEBULOUS_BROKER_URL'))
|
self.url = kwargs.get('url',os.getenv('NEBULOUS_BROKER_URL'))
|
||||||
self.port = kwargs.get('port', os.getenv('NEBULOUS_BROKER_PORT'))
|
self.port = kwargs.get('port', os.getenv('NEBULOUS_BROKER_PORT'))
|
||||||
self.username = kwargs.get('username',os.getenv('NEBULOUS_BROKER_USERNAME'))
|
self.username = kwargs.get('username',os.getenv('NEBULOUS_BROKER_USERNAME'))
|
||||||
self.password = kwargs.get('password', os.getenv('NEBULOUS_BROKER_PASSWORD'))
|
self.password = kwargs.get('password', os.getenv('NEBULOUS_BROKER_PASSWORD'))
|
||||||
|
self.handler = handler
|
||||||
|
|
||||||
# Validate attributes
|
# Validate attributes
|
||||||
if not self.url:
|
if not self.url:
|
||||||
@ -127,29 +51,34 @@ class EXN:
|
|||||||
_logger.error("PASSWORD cannot be empty or None")
|
_logger.error("PASSWORD cannot be empty or None")
|
||||||
raise ValueError("PASSWORD cannot be empty or None")
|
raise ValueError("PASSWORD cannot be empty or None")
|
||||||
|
|
||||||
ctx = core_context.Context(
|
self.context = Context(base=f"{base.NEBULOUS_BASE_NAME}.{self.component}")
|
||||||
connection=f"{self.url}:{self.port}",
|
|
||||||
base=f"{base.NEBULOUS_BASE_NAME}.{self.component}",
|
|
||||||
)
|
|
||||||
|
|
||||||
|
if not publishers:
|
||||||
|
publishers = []
|
||||||
|
|
||||||
|
if not consumers:
|
||||||
|
consumers = []
|
||||||
|
|
||||||
|
compiled_publishers = publishers
|
||||||
if kwargs.get("enable_state",False):
|
if kwargs.get("enable_state",False):
|
||||||
publishers.append(state_publisher.Publisher())
|
compiled_publishers.append(state_publisher.Publisher())
|
||||||
|
|
||||||
if kwargs.get("enable_health",False):
|
if kwargs.get("enable_health",False):
|
||||||
publishers.append(schedule_publisher.Publisher(
|
compiled_publishers.append(schedule_publisher.Publisher(
|
||||||
base.NEBULOUS_DEFAULT_HEALTH_CHECK_TIMEOUT,
|
base.NEBULOUS_DEFAULT_HEALTH_CHECK_TIMEOUT,
|
||||||
'health',
|
'health',
|
||||||
'health',
|
'health',
|
||||||
True))
|
topic=True))
|
||||||
|
|
||||||
core_handler = CoreHandler(
|
for c in consumers:
|
||||||
ctx,
|
self.context.register_consumers(c)
|
||||||
handler,
|
|
||||||
publishers,
|
|
||||||
consumers
|
|
||||||
)
|
|
||||||
|
|
||||||
self.container = Container(core_handler)
|
for p in compiled_publishers:
|
||||||
|
self.context.register_publisher(p)
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
self.container.run()
|
self.context.start(Manager(f"{self.url}:{self.port}"),self.handler)
|
||||||
|
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
self.context.stop()
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
|
|
||||||
|
|
||||||
from . import context
|
from . import context
|
||||||
|
from . import handler
|
||||||
from . import publisher
|
from . import publisher
|
||||||
from . import consumer
|
from . import consumer
|
||||||
from . import state_publisher
|
from . import state_publisher
|
||||||
|
@ -1,17 +1,43 @@
|
|||||||
import datetime
|
|
||||||
|
|
||||||
from proton import Message, Event
|
|
||||||
from . import link
|
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
|
from proton import Event
|
||||||
|
from .handler import Handler
|
||||||
|
|
||||||
|
from . import link
|
||||||
|
|
||||||
|
from proton.handlers import MessagingHandler
|
||||||
|
|
||||||
_logger = logging.getLogger(__name__)
|
_logger = logging.getLogger(__name__)
|
||||||
|
_logger.setLevel(level=logging.DEBUG)
|
||||||
|
|
||||||
|
|
||||||
class Consumer(link.Link):
|
class Consumer(link.Link, MessagingHandler):
|
||||||
|
application = None
|
||||||
|
|
||||||
def on_message(self, body, **kwargs):
|
def __init__(self, key, address, handler: Handler, application=None, topic=False, fqdn=False):
|
||||||
_logger.debug(f"{self.address} Got {body} ")
|
super(Consumer, self).__init__(key, address, topic, fqdn)
|
||||||
|
self.application = application
|
||||||
|
self.handler = handler
|
||||||
|
self.handler._consumer = self
|
||||||
|
|
||||||
def should_handle(self, event: Event):
|
def should_handle(self, event: Event):
|
||||||
if event.link == self._link:
|
|
||||||
return True
|
should = event.link.name == self._link.name and \
|
||||||
|
(self.application is None or event.message.subject == self.application)
|
||||||
|
|
||||||
|
_logger.debug(f"[{self.key}] checking if link is the same {event.link.name}={self._link.name} "
|
||||||
|
f" and application {self.application}={event.message.subject} == {should}")
|
||||||
|
|
||||||
|
return should
|
||||||
|
|
||||||
|
def on_start(self, event: Event) -> None:
|
||||||
|
_logger.debug(f"[{self.key}] on_start")
|
||||||
|
|
||||||
|
def on_message(self, event):
|
||||||
|
_logger.debug(f"[{self.key}] handling event with address => {event.message.address}")
|
||||||
|
try:
|
||||||
|
if self.should_handle(event):
|
||||||
|
self.handler.on_message(self.key, event.message.address, event.message.body, event.message)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
_logger.error(f"Received message: {e}")
|
||||||
|
@ -1,14 +1,60 @@
|
|||||||
from . import link
|
import logging
|
||||||
|
|
||||||
|
from proton.reactor import Container
|
||||||
|
|
||||||
|
from . import link
|
||||||
|
from .manager import Manager
|
||||||
|
|
||||||
|
|
||||||
|
_logger = logging.getLogger(__name__)
|
||||||
|
_logger.setLevel(logging.DEBUG)
|
||||||
|
|
||||||
class Context:
|
class Context:
|
||||||
|
|
||||||
def __init__(self, connection, base):
|
base = None
|
||||||
|
handler = None
|
||||||
|
publishers = {}
|
||||||
|
consumers = {}
|
||||||
|
_manager = None
|
||||||
|
|
||||||
|
def __init__(self, base):
|
||||||
|
|
||||||
self.connection = connection
|
|
||||||
self.base = base
|
self.base = base
|
||||||
self.publishers = {}
|
|
||||||
self.consumers = {}
|
def start(self, manager:Manager, handler):
|
||||||
|
self._manager = manager
|
||||||
|
|
||||||
|
def on_ready():
|
||||||
|
_logger.debug("[context] on_ready" )
|
||||||
|
for key,publisher in self.publishers.items():
|
||||||
|
self._manager.start_publisher(self,publisher)
|
||||||
|
|
||||||
|
for key,consumer in self.consumers.items():
|
||||||
|
self._manager.start_consumer(self,consumer)
|
||||||
|
|
||||||
|
handler.ready(context=self)
|
||||||
|
|
||||||
|
self._manager._on_ready=on_ready
|
||||||
|
self._manager.start()
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
if self._manager is not None and self._manager.started:
|
||||||
|
for key,publisher in self.publishers:
|
||||||
|
publisher._link.close()
|
||||||
|
for key,consumer in self.consumers:
|
||||||
|
consumer._link.close()
|
||||||
|
|
||||||
|
self._manager.close()
|
||||||
|
|
||||||
|
|
||||||
|
def register_publisher(self, publisher):
|
||||||
|
if publisher.key in self.publishers:
|
||||||
|
_logger.warning("[context] Trying to register publisher that already exists")
|
||||||
|
return
|
||||||
|
_logger.info(f"[context] registering publisher {publisher.key} {publisher.address}" )
|
||||||
|
self.publishers[publisher.key] = publisher
|
||||||
|
if self._manager is not None and self._manager.started:
|
||||||
|
self._manager.start_publisher(self,publisher)
|
||||||
|
|
||||||
def get_publisher(self, key):
|
def get_publisher(self, key):
|
||||||
if key in self.publishers:
|
if key in self.publishers:
|
||||||
@ -21,11 +67,31 @@ class Context:
|
|||||||
def has_consumer(self, key):
|
def has_consumer(self, key):
|
||||||
return key in self.consumers
|
return key in self.consumers
|
||||||
|
|
||||||
def register_publisher(self, publisher):
|
|
||||||
self.publishers[publisher.key] = publisher
|
|
||||||
|
|
||||||
def register_consumers(self, consumer):
|
def register_consumers(self, consumer):
|
||||||
|
if consumer.key in self.consumers:
|
||||||
|
_logger.warning("[context] Trying to register consumer that already exists")
|
||||||
|
return
|
||||||
|
|
||||||
self.consumers[consumer.key] = consumer
|
self.consumers[consumer.key] = consumer
|
||||||
|
if self._manager is not None and self._manager.started:
|
||||||
|
self._manager.start_consumer(self,consumer)
|
||||||
|
|
||||||
|
def unregister_consumer(self, key):
|
||||||
|
if not key in self.consumers:
|
||||||
|
_logger.warning("[context] Trying to unregister consumer that does not exists")
|
||||||
|
return
|
||||||
|
|
||||||
|
consumer = self.consumers.pop(key)
|
||||||
|
if self._manager is not None and self._manager.started:
|
||||||
|
consumer._link.close()
|
||||||
|
|
||||||
|
def unregister_publisher(self, key):
|
||||||
|
if not key in self.consumers:
|
||||||
|
_logger.warning("[context] Trying to unregister publisher that does not exists")
|
||||||
|
return
|
||||||
|
publisher = self.publishers.pop(key)
|
||||||
|
if self._manager is not None and self._manager.started:
|
||||||
|
publisher._link.close()
|
||||||
|
|
||||||
def build_address_from_link(self, link: link.Link):
|
def build_address_from_link(self, link: link.Link):
|
||||||
|
|
||||||
@ -41,23 +107,3 @@ class Context:
|
|||||||
|
|
||||||
return address
|
return address
|
||||||
|
|
||||||
def match_address(self, l: link.Link, event):
|
|
||||||
|
|
||||||
if not event \
|
|
||||||
or not event.message \
|
|
||||||
or not event.message.address:
|
|
||||||
return False
|
|
||||||
|
|
||||||
address = self.build_address_from_link(l)
|
|
||||||
return address == event.message.address
|
|
||||||
|
|
||||||
def build_address(self, *actions, topic=False):
|
|
||||||
|
|
||||||
if len(actions) <= 0:
|
|
||||||
return self.base
|
|
||||||
|
|
||||||
address = f"{self.base}.{'.'.join(actions)}"
|
|
||||||
if topic:
|
|
||||||
address = f"topic://{address}"
|
|
||||||
|
|
||||||
return address
|
|
||||||
|
11
exn/core/handler.py
Normal file
11
exn/core/handler.py
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
import logging
|
||||||
|
|
||||||
|
from proton import Message
|
||||||
|
|
||||||
|
_logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class Handler:
|
||||||
|
|
||||||
|
def on_message(self, key, address, body, message: Message, context=None):
|
||||||
|
_logger.info(f"You should really override this... {key}=>{address}")
|
@ -1,16 +1,18 @@
|
|||||||
|
|
||||||
|
|
||||||
from proton import Link as pLink
|
from proton import Link as pLink
|
||||||
|
|
||||||
class Link:
|
class Link:
|
||||||
|
|
||||||
fqdn=False
|
fqdn=False
|
||||||
def __init__(self, key, address, topic=False, fqdn=False):
|
|
||||||
|
|
||||||
|
def __init__(self, key, address, topic=False, fqdn=False):
|
||||||
|
super().__init__()
|
||||||
self.key = key
|
self.key = key
|
||||||
self.address = address
|
self.address = address
|
||||||
self._link = None
|
|
||||||
self.topic= topic
|
self.topic= topic
|
||||||
self.fqdn= fqdn
|
self.fqdn= fqdn
|
||||||
|
self._link = None
|
||||||
|
|
||||||
def set(self, link:pLink):
|
def set(self, link:pLink):
|
||||||
# The proton container creates a sender
|
# The proton container creates a sender
|
||||||
|
71
exn/core/manager.py
Normal file
71
exn/core/manager.py
Normal file
@ -0,0 +1,71 @@
|
|||||||
|
import logging
|
||||||
|
|
||||||
|
from proton import Event, Connection,Session
|
||||||
|
|
||||||
|
from proton.handlers import MessagingHandler
|
||||||
|
from proton.reactor import Container
|
||||||
|
|
||||||
|
from .consumer import Consumer
|
||||||
|
from .publisher import Publisher
|
||||||
|
|
||||||
|
_logger = logging.getLogger(__name__)
|
||||||
|
_logger.setLevel(logging.DEBUG)
|
||||||
|
|
||||||
|
|
||||||
|
class SessionPerConsumer(object):
|
||||||
|
def session(self, connection: Connection) -> Session:
|
||||||
|
session = connection.session()
|
||||||
|
session.open()
|
||||||
|
return session
|
||||||
|
|
||||||
|
|
||||||
|
class Manager(MessagingHandler):
|
||||||
|
uri = None
|
||||||
|
started = False
|
||||||
|
container = None
|
||||||
|
connection = None
|
||||||
|
|
||||||
|
_on_ready = None
|
||||||
|
|
||||||
|
def __init__(self, uri):
|
||||||
|
super(Manager, self).__init__()
|
||||||
|
self.uri = uri
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
_logger.info(f"[manager] starting")
|
||||||
|
self.container = Container(self)
|
||||||
|
self.container.run()
|
||||||
|
|
||||||
|
def on_start(self, event: Event) -> None:
|
||||||
|
self.connection = self.container.connect(self.uri)
|
||||||
|
self.connection._session_policy=SessionPerConsumer()
|
||||||
|
|
||||||
|
self.started=True
|
||||||
|
_logger.debug(f"[manager] on_start")
|
||||||
|
if self._on_ready is not None:
|
||||||
|
self._on_ready()
|
||||||
|
|
||||||
|
def on_message(self, event: Event) -> None:
|
||||||
|
_logger.warning(f"[manager] received generic on_message make sure you have set up your handlers"
|
||||||
|
f" properly ")
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
_logger.info(f"[manager] closing")
|
||||||
|
if self.container:
|
||||||
|
self.container.stop()
|
||||||
|
|
||||||
|
if self.connection:
|
||||||
|
self.connection.close()
|
||||||
|
|
||||||
|
def start_publisher(self, context, publisher: Publisher):
|
||||||
|
address = context.build_address_from_link(publisher)
|
||||||
|
_logger.info(f"[manager] starting publisher {publisher.key} => {address}")
|
||||||
|
publisher.set(self.container.create_sender(self.connection, address))
|
||||||
|
if hasattr(publisher, "delay"):
|
||||||
|
_logger.debug(f"{context.base} registering timer {hasattr(publisher, 'delay')}")
|
||||||
|
self.container.schedule(publisher.delay, handler=publisher)
|
||||||
|
|
||||||
|
def start_consumer(self, context, consumer: Consumer):
|
||||||
|
address = context.build_address_from_link(consumer)
|
||||||
|
_logger.info(f"[manager] starting consumer {consumer.key} => {address}")
|
||||||
|
consumer.set(self.container.create_receiver(self.connection, address , handler=consumer))
|
@ -10,12 +10,15 @@ _logger = logging.getLogger(__name__)
|
|||||||
|
|
||||||
class Publisher(link.Link):
|
class Publisher(link.Link):
|
||||||
|
|
||||||
def send(self, body=None):
|
def send(self, body=None, application=None):
|
||||||
if not body:
|
if not body:
|
||||||
body = {}
|
body = {}
|
||||||
|
|
||||||
_logger.debug(f"{self.address} Sending {body} ")
|
_logger.info(f"[{self.key}] sending to {self._link.target.address} for application={application} - {body} ")
|
||||||
msg = self._prepare_message(body)
|
msg = self._prepare_message(body)
|
||||||
|
if application:
|
||||||
|
msg.subject = application
|
||||||
|
|
||||||
self._link.send(msg)
|
self._link.send(msg)
|
||||||
|
|
||||||
def _prepare_message(self, body=None):
|
def _prepare_message(self, body=None):
|
||||||
|
@ -1,14 +1,24 @@
|
|||||||
import logging
|
import logging
|
||||||
|
|
||||||
from . import publisher
|
from proton.handlers import MessagingHandler
|
||||||
|
from .publisher import Publisher
|
||||||
|
|
||||||
_logger = logging.getLogger(__name__)
|
_logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class Publisher(publisher.Publisher):
|
class Publisher(Publisher, MessagingHandler):
|
||||||
send_next = False
|
send_next = False
|
||||||
delay = 15
|
delay = 15
|
||||||
|
|
||||||
def __init__(self, delay, key, address, topic=False):
|
def __init__(self, delay, key, address, application=None, topic=False, fqdn=False):
|
||||||
super(Publisher, self).__init__(key, address, topic)
|
super(Publisher, self).__init__(key, address, topic,fqdn)
|
||||||
self.delay = delay
|
self.delay = delay
|
||||||
|
self.application = application
|
||||||
|
|
||||||
|
def on_timer_task(self, event):
|
||||||
|
_logger.debug(f"[manager] on_timer_task")
|
||||||
|
self.send()
|
||||||
|
event.reactor.schedule(self.delay, self)
|
||||||
|
|
||||||
|
def send(self, body=None, application=None):
|
||||||
|
super(Publisher, self).send(body, self.application)
|
||||||
|
@ -27,19 +27,19 @@ class Publisher(publisher.Publisher):
|
|||||||
self.send({"state": message_type,"message": None})
|
self.send({"state": message_type,"message": None})
|
||||||
|
|
||||||
def starting(self):
|
def starting(self):
|
||||||
self._send_message(States.STARTING)
|
self._send_message(States.STARTING.value)
|
||||||
|
|
||||||
def started(self):
|
def started(self):
|
||||||
self._send_message(States.STARTED)
|
self._send_message(States.STARTED.value)
|
||||||
|
|
||||||
def ready(self):
|
def ready(self):
|
||||||
self._send_message(States.READY)
|
self._send_message(States.READY.value)
|
||||||
|
|
||||||
def stopping(self):
|
def stopping(self):
|
||||||
self._send_message(States.STOPPING)
|
self._send_message(States.STOPPING.value)
|
||||||
|
|
||||||
def stopped(self):
|
def stopped(self):
|
||||||
self._send_message(States.STOPPED)
|
self._send_message(States.STOPPED.value)
|
||||||
|
|
||||||
def custom(self, state):
|
def custom(self, state):
|
||||||
self._send_message(state)
|
self._send_message(state)
|
||||||
|
2
exn/handler/__init__.py
Normal file
2
exn/handler/__init__.py
Normal file
@ -0,0 +1,2 @@
|
|||||||
|
|
||||||
|
from . import connector_handler
|
12
exn/handler/connector_handler.py
Normal file
12
exn/handler/connector_handler.py
Normal file
@ -0,0 +1,12 @@
|
|||||||
|
|
||||||
|
import logging
|
||||||
|
|
||||||
|
|
||||||
|
_logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
class ConnectorHandler:
|
||||||
|
|
||||||
|
def ready(self, context):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
Loading…
x
Reference in New Issue
Block a user