Merge pull request #178 from ramielrowe/master

Adding queue name to worker topic config
This commit is contained in:
Andrew Melton 2013-08-22 06:54:06 -07:00
commit 45fea2a56d
3 changed files with 51 additions and 29 deletions

View File

@ -10,8 +10,26 @@
"exit_on_exception": true,
"queue_name": "stacktach",
"topics": {
"nova": ["monitor.info", "monitor.error"],
"glance": ["monitor_glance.info", "monitor_glance.error"]
"nova": [
{
"queue": "monitor.info",
"routing_key": "monitor.info"
},
{
"queue": "monitor.error",
"routing_key": "monitor.error"
}
],
"glance": [
{
"queue": "stacktach_monitor_glance.info",
"routing_key": "monitor_glance.info"
},
{
"queue": "stacktach_monitor_glance.error",
"routing_key": "monitor_glance.error"
},
]
}
},
{
@ -25,8 +43,16 @@
"exit_on_exception": false,
"queue_name": "stacktach",
"topics": {
"nova": ["monitor.info", "monitor.error"],
"glance": ["monitor_glance.info", "monitor_glance.error"]
"nova": [
{
"queue": "monitor.info",
"routing_key": "monitor.info"
},
{
"queue": "monitor.error",
"routing_key": "monitor.error"
}
]
}
}]
}

View File

@ -36,6 +36,12 @@ class ConsumerTestCase(StacktachBaseTestCase):
def tearDown(self):
self.mox.UnsetStubs()
def _test_topics(self):
return [
dict(queue="queue1", routing_key="monitor.info"),
dict(queue="queue2", routing_key="monitor.error")
]
def test_get_consumers(self):
created_queues = []
created_callbacks = []
@ -49,15 +55,14 @@ class ConsumerTestCase(StacktachBaseTestCase):
self.mox.StubOutWithMock(worker.Consumer, '_create_exchange')
self.mox.StubOutWithMock(worker.Consumer, '_create_queue')
consumer = worker.Consumer('test', None, None, True, {}, "nova",
["monitor.info", "monitor.error"],
"stacktach_")
self._test_topics())
exchange = self.mox.CreateMockAnything()
consumer._create_exchange('nova', 'topic').AndReturn(exchange)
info_queue = self.mox.CreateMockAnything()
error_queue = self.mox.CreateMockAnything()
consumer._create_queue('stacktach_nova', exchange, 'monitor.info')\
consumer._create_queue('queue1', exchange, 'monitor.info')\
.AndReturn(info_queue)
consumer._create_queue('stacktach_nova', exchange, 'monitor.error')\
consumer._create_queue('queue2', exchange, 'monitor.error')\
.AndReturn(error_queue)
self.mox.ReplayAll()
consumers = consumer.get_consumers(Consumer, None)
@ -73,8 +78,7 @@ class ConsumerTestCase(StacktachBaseTestCase):
def test_create_exchange(self):
args = {'key': 'value'}
consumer = worker.Consumer('test', None, None, True, args, 'nova',
["monitor.info", "monitor.error"],
"stacktach_")
self._test_topics())
self.mox.StubOutClassWithMocks(kombu.entity, 'Exchange')
exchange = kombu.entity.Exchange('nova', type='topic', exclusive=False,
@ -91,8 +95,7 @@ class ConsumerTestCase(StacktachBaseTestCase):
exclusive=False, routing_key='routing.key',
queue_arguments={})
consumer = worker.Consumer('test', None, None, True, {}, 'nova',
["monitor.info", "monitor.error"],
"stacktach_")
self._test_topics())
self.mox.ReplayAll()
actual_queue = consumer._create_queue('name', exchange, 'routing.key',
exclusive=False,
@ -109,8 +112,7 @@ class ConsumerTestCase(StacktachBaseTestCase):
exclusive=False, routing_key='routing.key',
queue_arguments=queue_args)
consumer = worker.Consumer('test', None, None, True, queue_args,
'nova', ["monitor.info", "monitor.error"],
"stacktach_")
'nova', self._test_topics())
self.mox.ReplayAll()
actual_queue = consumer._create_queue('name', exchange, 'routing.key',
exclusive=False,
@ -126,8 +128,7 @@ class ConsumerTestCase(StacktachBaseTestCase):
exchange = 'nova'
consumer = worker.Consumer('test', None, deployment, True, {},
exchange, ["monitor.info", "monitor.error"],
"stacktach_")
exchange, self._test_topics())
routing_key = 'monitor.info'
message.delivery_info = {'routing_key': routing_key}
body_dict = {u'key': u'value'}
@ -165,7 +166,7 @@ class ConsumerTestCase(StacktachBaseTestCase):
'rabbit_password': 'rabbit',
'rabbit_virtual_host': '/',
"services": ["nova"],
"topics": {"nova": ["monitor.info", "monitor.error"]}
"topics": {"nova": self._test_topics()}
}
self.mox.StubOutWithMock(db, 'get_or_create_deployment')
deployment = self.mox.CreateMockAnything()
@ -188,8 +189,7 @@ class ConsumerTestCase(StacktachBaseTestCase):
exchange = 'nova'
consumer = worker.Consumer(config['name'], conn, deployment,
config['durable_queue'], {}, exchange,
["monitor.info", "monitor.error"],
"stacktach_")
self._test_topics())
consumer.run()
worker.continue_running().AndReturn(False)
self.mox.ReplayAll()
@ -208,7 +208,7 @@ class ConsumerTestCase(StacktachBaseTestCase):
'queue_arguments': {'x-ha-policy': 'all'},
'queue_name_prefix': "test_name_",
"services": ["nova"],
"topics": {"nova": ["monitor.info", "monitor.error"]}
"topics": {"nova": self._test_topics()}
}
self.mox.StubOutWithMock(db, 'get_or_create_deployment')
deployment = self.mox.CreateMockAnything()
@ -232,8 +232,7 @@ class ConsumerTestCase(StacktachBaseTestCase):
consumer = worker.Consumer(config['name'], conn, deployment,
config['durable_queue'],
config['queue_arguments'], exchange,
["monitor.info", "monitor.error"],
"test_name_")
self._test_topics())
consumer.run()
worker.continue_running().AndReturn(False)
self.mox.ReplayAll()

View File

@ -44,7 +44,7 @@ LOG = stacklog.get_logger()
class Consumer(kombu.mixins.ConsumerMixin):
def __init__(self, name, connection, deployment, durable, queue_arguments,
exchange, topics, queue_name_prefix):
exchange, topics):
self.connection = connection
self.deployment = deployment
self.durable = durable
@ -56,7 +56,6 @@ class Consumer(kombu.mixins.ConsumerMixin):
self.total_processed = 0
self.topics = topics
self.exchange = exchange
self.queue_name_prefix = queue_name_prefix
def _create_exchange(self, name, type, exclusive=False, auto_delete=False):
return message_service.create_exchange(name, exchange_type=type, exclusive=exclusive,
@ -73,8 +72,8 @@ class Consumer(kombu.mixins.ConsumerMixin):
def get_consumers(self, Consumer, channel):
exchange = self._create_exchange(self.exchange, "topic")
queue_name = "%s%s" % (self.queue_name_prefix, self.exchange)
queues = [self._create_queue(queue_name, exchange, topic)
queues = [self._create_queue(topic['queue'], exchange,
topic['routing_key'])
for topic in self.topics]
return [Consumer(queues=queues, callbacks=[self.on_nova])]
@ -154,7 +153,6 @@ def run(deployment_config, exchange):
queue_arguments = deployment_config.get('queue_arguments', {})
exit_on_exception = deployment_config.get('exit_on_exception', False)
topics = deployment_config.get('topics', {})
queue_name_prefix = deployment_config.get('queue_name_prefix', 'stacktach_')
deployment, new = db.get_or_create_deployment(name)
@ -177,8 +175,7 @@ def run(deployment_config, exchange):
try:
consumer = Consumer(name, conn, deployment, durable,
queue_arguments, exchange,
topics[exchange],
queue_name_prefix)
topics[exchange])
consumer.run()
except Exception as e:
LOG.error("!!!!Exception!!!!")