Allow fake driver to consume multiple topics
This patch allow the fake driver to comsume multiple topics with one listener. Partial implements blueprint notification-subscriber-server Change-Id: Ib52dc181e10b487854fbb398eda9f758232a1251
This commit is contained in:
parent
f81cde600b
commit
11a90eabc9
@ -66,8 +66,8 @@ class AMQPIncomingMessage(base.IncomingMessage):
|
|||||||
|
|
||||||
class AMQPListener(base.Listener):
|
class AMQPListener(base.Listener):
|
||||||
|
|
||||||
def __init__(self, driver, target, conn):
|
def __init__(self, driver, conn):
|
||||||
super(AMQPListener, self).__init__(driver, target)
|
super(AMQPListener, self).__init__(driver)
|
||||||
self.conn = conn
|
self.conn = conn
|
||||||
self.msg_id_cache = rpc_amqp._MsgIdCache()
|
self.msg_id_cache = rpc_amqp._MsgIdCache()
|
||||||
self.incoming = []
|
self.incoming = []
|
||||||
@ -395,7 +395,7 @@ class AMQPDriverBase(base.BaseDriver):
|
|||||||
def listen(self, target):
|
def listen(self, target):
|
||||||
conn = self._get_connection(pooled=False)
|
conn = self._get_connection(pooled=False)
|
||||||
|
|
||||||
listener = AMQPListener(self, target, conn)
|
listener = AMQPListener(self, conn)
|
||||||
|
|
||||||
conn.declare_topic_consumer(target.topic, listener)
|
conn.declare_topic_consumer(target.topic, listener)
|
||||||
conn.declare_topic_consumer('%s.%s' % (target.topic, target.server),
|
conn.declare_topic_consumer('%s.%s' % (target.topic, target.server),
|
||||||
|
@ -41,10 +41,9 @@ class IncomingMessage(object):
|
|||||||
@six.add_metaclass(abc.ABCMeta)
|
@six.add_metaclass(abc.ABCMeta)
|
||||||
class Listener(object):
|
class Listener(object):
|
||||||
|
|
||||||
def __init__(self, driver, target):
|
def __init__(self, driver):
|
||||||
self.conf = driver.conf
|
self.conf = driver.conf
|
||||||
self.driver = driver
|
self.driver = driver
|
||||||
self.target = target
|
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def poll(self):
|
def poll(self):
|
||||||
|
@ -39,13 +39,15 @@ class FakeIncomingMessage(base.IncomingMessage):
|
|||||||
|
|
||||||
class FakeListener(base.Listener):
|
class FakeListener(base.Listener):
|
||||||
|
|
||||||
def __init__(self, driver, target, exchange):
|
def __init__(self, driver, exchange, targets):
|
||||||
super(FakeListener, self).__init__(driver, target)
|
super(FakeListener, self).__init__(driver)
|
||||||
self._exchange = exchange
|
self._exchange = exchange
|
||||||
|
self._targets = targets
|
||||||
|
|
||||||
def poll(self):
|
def poll(self):
|
||||||
while True:
|
while True:
|
||||||
(ctxt, message, reply_q) = self._exchange.poll(self.target)
|
for target in self._targets:
|
||||||
|
(ctxt, message, reply_q) = self._exchange.poll(target)
|
||||||
if message is not None:
|
if message is not None:
|
||||||
return FakeIncomingMessage(self, ctxt, message, reply_q)
|
return FakeIncomingMessage(self, ctxt, message, reply_q)
|
||||||
time.sleep(.05)
|
time.sleep(.05)
|
||||||
@ -80,8 +82,9 @@ class FakeExchange(object):
|
|||||||
|
|
||||||
def poll(self, target):
|
def poll(self, target):
|
||||||
with self._queues_lock:
|
with self._queues_lock:
|
||||||
|
if target.server:
|
||||||
queue = self._get_server_queue(target.topic, target.server)
|
queue = self._get_server_queue(target.topic, target.server)
|
||||||
if not queue:
|
else:
|
||||||
queue = self._get_topic_queue(target.topic)
|
queue = self._get_topic_queue(target.topic)
|
||||||
return queue.pop(0) if queue else (None, None, None)
|
return queue.pop(0) if queue else (None, None, None)
|
||||||
|
|
||||||
@ -152,7 +155,11 @@ class FakeDriver(base.BaseDriver):
|
|||||||
exchange = self._get_exchange(target.exchange or
|
exchange = self._get_exchange(target.exchange or
|
||||||
self._default_exchange)
|
self._default_exchange)
|
||||||
|
|
||||||
return FakeListener(self, target, exchange)
|
listener = FakeListener(self, exchange,
|
||||||
|
[messaging.Target(topic=target.topic,
|
||||||
|
server=target.server),
|
||||||
|
messaging.Target(topic=target.topic)])
|
||||||
|
return listener
|
||||||
|
|
||||||
def cleanup(self):
|
def cleanup(self):
|
||||||
pass
|
pass
|
||||||
|
@ -846,8 +846,8 @@ class ZmqIncomingMessage(base.IncomingMessage):
|
|||||||
|
|
||||||
class ZmqListener(base.Listener):
|
class ZmqListener(base.Listener):
|
||||||
|
|
||||||
def __init__(self, driver, target):
|
def __init__(self, driver):
|
||||||
super(ZmqListener, self).__init__(driver, target)
|
super(ZmqListener, self).__init__(driver)
|
||||||
self.incoming_queue = moves.queue.Queue()
|
self.incoming_queue = moves.queue.Queue()
|
||||||
|
|
||||||
def dispatch(self, ctxt, version, method, namespace, **kwargs):
|
def dispatch(self, ctxt, version, method, namespace, **kwargs):
|
||||||
@ -948,7 +948,7 @@ class ZmqDriver(base.BaseDriver):
|
|||||||
def listen(self, target):
|
def listen(self, target):
|
||||||
conn = create_connection(self.conf)
|
conn = create_connection(self.conf)
|
||||||
|
|
||||||
listener = ZmqListener(self, target)
|
listener = ZmqListener(self)
|
||||||
|
|
||||||
conn.create_consumer(target.topic, listener)
|
conn.create_consumer(target.topic, listener)
|
||||||
conn.create_consumer('%s.%s' % (target.topic, target.server),
|
conn.create_consumer('%s.%s' % (target.topic, target.server),
|
||||||
|
Loading…
x
Reference in New Issue
Block a user