116 lines
4.1 KiB
Python
116 lines
4.1 KiB
Python
import logging
|
|
import threading
|
|
import time
|
|
import types
|
|
|
|
import proton
|
|
from proton import Event, Connection, Session, Message
|
|
|
|
from proton.handlers import MessagingHandler
|
|
from proton.reactor import Container,ReceiverOption,Filter,Selector
|
|
|
|
from .consumer import Consumer
|
|
from .publisher import Publisher
|
|
from .handler import Handler
|
|
from .synced_publisher import SyncedPublisher
|
|
|
|
_logger = logging.getLogger(__name__)
|
|
_logger.setLevel(logging.DEBUG)
|
|
|
|
|
|
class SessionPerConsumer(object):
|
|
def session(self, connection: Connection) -> Session:
|
|
session = connection.session()
|
|
session.open()
|
|
return session
|
|
|
|
|
|
class Manager(MessagingHandler):
|
|
uri = None
|
|
started = False
|
|
container = None
|
|
connection = None
|
|
_on_ready = None
|
|
|
|
def __init__(self, uri):
|
|
super(Manager, self).__init__()
|
|
self.uri = uri
|
|
|
|
def start(self):
|
|
_logger.info(f"[manager] starting")
|
|
self.container = Container(self)
|
|
self.container.run()
|
|
|
|
def on_start(self, event: Event) -> None:
|
|
self.connection = self.container.connect(self.uri,)
|
|
self.connection._session_policy = SessionPerConsumer()
|
|
def connection_state():
|
|
|
|
while self.connection.state != 18:
|
|
time.sleep(0.05)
|
|
self.started = True
|
|
_logger.debug(f"[manager] on_start")
|
|
if self._on_ready is not None:
|
|
self._on_ready()
|
|
|
|
threading.Thread(target=connection_state).start()
|
|
|
|
def on_message(self, event: Event) -> None:
|
|
_logger.warning(f"[manager] received generic on_message make sure you have set up your handlers"
|
|
f" properly ")
|
|
|
|
def close(self):
|
|
_logger.info(f"[manager] closing")
|
|
if self.container:
|
|
self.container.stop()
|
|
|
|
if self.connection:
|
|
self.connection.close()
|
|
|
|
def start_publisher(self, context, publisher: Publisher):
|
|
address = context.build_address_from_link(publisher)
|
|
_logger.info(f"[manager] starting publisher {publisher.key} => {address}")
|
|
publisher.set(self.container.create_sender(self.connection, address))
|
|
publisher.context = context
|
|
if hasattr(publisher, "delay"):
|
|
_logger.debug(f"{context.base} registering timer {hasattr(publisher, 'delay')}")
|
|
self.container.schedule(publisher.delay, handler=publisher)
|
|
|
|
if hasattr(publisher, "reply_address"):
|
|
_logger.info(f"[manager] starting Synced consumer for {publisher.key} => {publisher.reply_address}")
|
|
def on_my_message(self, key, address, body, message: Message, context=None):
|
|
_logger.info(f"[{publisher.key}] handler received {key} => {message.correlation_id}")
|
|
if publisher.match_correlation_id(message.correlation_id):
|
|
_logger.info(f"[{publisher.key}] handler received {key} / matched => response {body} ")
|
|
publisher._replied = body
|
|
|
|
r_handler = Handler()
|
|
r_handler.on_message= types.MethodType(on_my_message,r_handler)
|
|
self.start_consumer(
|
|
context,
|
|
Consumer(publisher.key+"-reply",
|
|
publisher.reply_address,
|
|
handler=r_handler,
|
|
topic=publisher.reply_topic,
|
|
fqdn=publisher.reply_fqdn
|
|
)
|
|
)
|
|
|
|
def start_consumer(self, context, consumer: Consumer):
|
|
address = context.build_address_from_link(consumer)
|
|
consumer.context = context
|
|
if consumer.application:
|
|
_logger.info(f"[manager] starting consumer {consumer.key} => {address} and application={consumer.application}")
|
|
consumer.set(self.container.create_receiver(
|
|
self.connection,
|
|
address,
|
|
handler=consumer,
|
|
options=Selector(u"application = '"+consumer.application+"'"))
|
|
)
|
|
|
|
else:
|
|
_logger.info(f"[manager] starting consumer {consumer.key} => {address}")
|
|
consumer.set(self.container.create_receiver(self.connection, address, handler=consumer))
|
|
|
|
|