From c2ac9b8bf85e7f08d6690a957b5e8b470ff44778 Mon Sep 17 00:00:00 2001 From: Sandy Walsh Date: Wed, 21 May 2014 20:44:00 +0000 Subject: [PATCH] publish and consume working --- bin/event_consumer.py | 32 ++++++++++++++++++++++++++++++++ bin/event_pump.py | 5 +++-- 2 files changed, 35 insertions(+), 2 deletions(-) create mode 100644 bin/event_consumer.py diff --git a/bin/event_consumer.py b/bin/event_consumer.py new file mode 100644 index 0000000..bd7cc04 --- /dev/null +++ b/bin/event_consumer.py @@ -0,0 +1,32 @@ +"""The consumer part of event_pump. Uses Notabene to read +published notifications. + +""" + +import logging +import sys + +from notabene import kombu_driver as driver + + +class Callback(object): + x = 0 + def on_event(self, deployment, routing_key, body, exchange): + print "Got:", body + self.x += 1 + if self.x > 10: + sys.exit(1) + + def shutting_down(self): + print "Shutting down" + +config = {"topics":{ + "monitor":[ + {"queue":"monitor.info", + "routing_key":"monitor.info"}, + ] + }} +logging.basicConfig(level=logging.DEBUG) +driver.start_worker(Callback(), "event_consumer", 1, config, + "monitor", logging) + diff --git a/bin/event_pump.py b/bin/event_pump.py index 11c7a58..059ae1d 100644 --- a/bin/event_pump.py +++ b/bin/event_pump.py @@ -18,10 +18,11 @@ connection = driver.create_connection("localhost", 5672, 'guest', 'guest', "librabbitmq", "/") exchange = driver.create_exchange("monitor", "topic") queue_name = "monitor.info" -queue = driver.create_queue(queue_name, exchange, queue_name, channel=connection.channel()) +queue = driver.create_queue(queue_name, exchange, queue_name, + channel=connection.channel()) queue.declare() -g = notigen.EventGenerator(100) # Number of operations per minute +g = notigen.EventGenerator(1000) # Number of operations per minute now = datetime.datetime.utcnow() start = now nevents = 0