72 lines
2.2 KiB
Python
72 lines
2.2 KiB
Python
import logging
|
|
|
|
from proton import Event, Connection,Session
|
|
|
|
from proton.handlers import MessagingHandler
|
|
from proton.reactor import Container
|
|
|
|
from .consumer import Consumer
|
|
from .publisher import Publisher
|
|
|
|
_logger = logging.getLogger(__name__)
|
|
_logger.setLevel(logging.DEBUG)
|
|
|
|
|
|
class SessionPerConsumer(object):
|
|
def session(self, connection: Connection) -> Session:
|
|
session = connection.session()
|
|
session.open()
|
|
return session
|
|
|
|
|
|
class Manager(MessagingHandler):
|
|
uri = None
|
|
started = False
|
|
container = None
|
|
connection = None
|
|
|
|
_on_ready = None
|
|
|
|
def __init__(self, uri):
|
|
super(Manager, self).__init__()
|
|
self.uri = uri
|
|
|
|
def start(self):
|
|
_logger.info(f"[manager] starting")
|
|
self.container = Container(self)
|
|
self.container.run()
|
|
|
|
def on_start(self, event: Event) -> None:
|
|
self.connection = self.container.connect(self.uri)
|
|
self.connection._session_policy=SessionPerConsumer()
|
|
|
|
self.started=True
|
|
_logger.debug(f"[manager] on_start")
|
|
if self._on_ready is not None:
|
|
self._on_ready()
|
|
|
|
def on_message(self, event: Event) -> None:
|
|
_logger.warning(f"[manager] received generic on_message make sure you have set up your handlers"
|
|
f" properly ")
|
|
|
|
def close(self):
|
|
_logger.info(f"[manager] closing")
|
|
if self.container:
|
|
self.container.stop()
|
|
|
|
if self.connection:
|
|
self.connection.close()
|
|
|
|
def start_publisher(self, context, publisher: Publisher):
|
|
address = context.build_address_from_link(publisher)
|
|
_logger.info(f"[manager] starting publisher {publisher.key} => {address}")
|
|
publisher.set(self.container.create_sender(self.connection, address))
|
|
if hasattr(publisher, "delay"):
|
|
_logger.debug(f"{context.base} registering timer {hasattr(publisher, 'delay')}")
|
|
self.container.schedule(publisher.delay, handler=publisher)
|
|
|
|
def start_consumer(self, context, consumer: Consumer):
|
|
address = context.build_address_from_link(consumer)
|
|
_logger.info(f"[manager] starting consumer {consumer.key} => {address}")
|
|
consumer.set(self.container.create_receiver(self.connection, address , handler=consumer))
|