Gregory Koronakos c504f94705 rank node candidates
Change-Id: I260f8105cdd675d9475a9179149a445c4f56fa17
2024-03-20 01:27:00 +02:00

116 lines
4.1 KiB
Python

import logging
import threading
import time
import types
import proton
from proton import Event, Connection, Session, Message
from proton.handlers import MessagingHandler
from proton.reactor import Container,ReceiverOption,Filter,Selector
from .consumer import Consumer
from .publisher import Publisher
from .handler import Handler
from .synced_publisher import SyncedPublisher
_logger = logging.getLogger(__name__)
_logger.setLevel(logging.DEBUG)
class SessionPerConsumer(object):
def session(self, connection: Connection) -> Session:
session = connection.session()
session.open()
return session
class Manager(MessagingHandler):
uri = None
started = False
container = None
connection = None
_on_ready = None
def __init__(self, uri):
super(Manager, self).__init__()
self.uri = uri
def start(self):
_logger.info(f"[manager] starting")
self.container = Container(self)
self.container.run()
def on_start(self, event: Event) -> None:
self.connection = self.container.connect(self.uri,)
self.connection._session_policy = SessionPerConsumer()
def connection_state():
while self.connection.state != 18:
time.sleep(0.05)
self.started = True
_logger.debug(f"[manager] on_start")
if self._on_ready is not None:
self._on_ready()
threading.Thread(target=connection_state).start()
def on_message(self, event: Event) -> None:
_logger.warning(f"[manager] received generic on_message make sure you have set up your handlers"
f" properly ")
def close(self):
_logger.info(f"[manager] closing")
if self.container:
self.container.stop()
if self.connection:
self.connection.close()
def start_publisher(self, context, publisher: Publisher):
address = context.build_address_from_link(publisher)
_logger.info(f"[manager] starting publisher {publisher.key} => {address}")
publisher.set(self.container.create_sender(self.connection, address))
publisher.context = context
if hasattr(publisher, "delay"):
_logger.debug(f"{context.base} registering timer {hasattr(publisher, 'delay')}")
self.container.schedule(publisher.delay, handler=publisher)
if hasattr(publisher, "reply_address"):
_logger.info(f"[manager] starting Synced consumer for {publisher.key} => {publisher.reply_address}")
def on_my_message(self, key, address, body, message: Message, context=None):
_logger.info(f"[{publisher.key}] handler received {key} => {message.correlation_id}")
if publisher.match_correlation_id(message.correlation_id):
_logger.info(f"[{publisher.key}] handler received {key} / matched => response {body} ")
publisher._replied = body
r_handler = Handler()
r_handler.on_message= types.MethodType(on_my_message,r_handler)
self.start_consumer(
context,
Consumer(publisher.key+"-reply",
publisher.reply_address,
handler=r_handler,
topic=publisher.reply_topic,
fqdn=publisher.reply_fqdn
)
)
def start_consumer(self, context, consumer: Consumer):
address = context.build_address_from_link(consumer)
consumer.context = context
if consumer.application:
_logger.info(f"[manager] starting consumer {consumer.key} => {address} and application={consumer.application}")
consumer.set(self.container.create_receiver(
self.connection,
address,
handler=consumer,
options=Selector(u"application = '"+consumer.application+"'"))
)
else:
_logger.info(f"[manager] starting consumer {consumer.key} => {address}")
consumer.set(self.container.create_receiver(self.connection, address, handler=consumer))