This commit is contained in:
Sandy Walsh 2012-02-12 12:50:40 -08:00
parent 2b2dbb3928
commit dc4e77017e

View File

@ -18,9 +18,10 @@ scheduler_exchange = kombu.entity.Exchange("scheduler_fanout", type="fanout",
scheduler_queues = [ scheduler_queues = [
# The Queue name has to be unique or we we'll end up with Round Robin # The Queue name has to be unique or we we'll end up with Round Robin
# behavior from Rabbit, even though it's a Fanout queue. In Nova the queues # behavior from Rabbit, even though it's a Fanout queue. In Nova the
# have UUID's tacked on the end. # queues have UUID's tacked on the end.
kombu.Queue("scheduler.xxx", scheduler_exchange, durable=False, auto_delete=False), kombu.Queue("scheduler.xxx", scheduler_exchange, durable=False,
auto_delete=False),
] ]
nova_exchange = kombu.entity.Exchange("nova", type="topic", nova_exchange = kombu.entity.Exchange("nova", type="topic",
@ -44,7 +45,8 @@ class SchedulerFanoutConsumer(kombu.mixins.ConsumerMixin):
self.connection = connection self.connection = connection
def get_consumers(self, Consumer, channel): def get_consumers(self, Consumer, channel):
return [Consumer(queues=scheduler_queues, callbacks=[self.on_scheduler]), return [Consumer(queues=scheduler_queues,
callbacks=[self.on_scheduler]),
Consumer(queues=nova_queues, callbacks=[self.on_nova])] Consumer(queues=nova_queues, callbacks=[self.on_nova])]
def _process(self, body, message): def _process(self, body, message):
@ -58,7 +60,7 @@ class SchedulerFanoutConsumer(kombu.mixins.ConsumerMixin):
req = urllib2.Request(url, cooked_data) req = urllib2.Request(url, cooked_data)
response = urllib2.urlopen(req) response = urllib2.urlopen(req)
page = response.read() page = response.read()
print "sent", page print page
except urllib2.HTTPError, e: except urllib2.HTTPError, e:
print e print e
page = e.read() page = e.read()
@ -82,8 +84,8 @@ if __name__ == "__main__":
virtual_host=RABBIT_VIRTUAL_HOST) virtual_host=RABBIT_VIRTUAL_HOST)
with kombu.connection.BrokerConnection(**params) as conn: with kombu.connection.BrokerConnection(**params) as conn:
consumer = SchedulerFanoutConsumer(conn) consumer = SchedulerFanoutConsumer(conn)
try: try:
consumer.run() consumer.run()
except KeyboardInterrupt: except KeyboardInterrupt:
print("bye bye") print("bye bye")