From 8118fb8fabdaa8ddb68c02042727d35a1362920d Mon Sep 17 00:00:00 2001 From: Andrew Melton Date: Fri, 26 Apr 2013 13:14:08 -0400 Subject: [PATCH] Only ack message if successfully processed --- tests/unit/test_worker.py | 27 +++++++++++++++++++++++++-- worker/worker.py | 2 +- 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/tests/unit/test_worker.py b/tests/unit/test_worker.py index 661d780..199f0ad 100644 --- a/tests/unit/test_worker.py +++ b/tests/unit/test_worker.py @@ -96,7 +96,6 @@ class NovaConsumerTestCase(unittest.TestCase): self.mox.VerifyAll() - def test_create_queue_with_queue_args(self): self.mox.StubOutClassWithMocks(kombu, 'Queue') exchange = self.mox.CreateMockAnything() @@ -127,14 +126,38 @@ class NovaConsumerTestCase(unittest.TestCase): args = (routing_key, body_dict) views.process_raw_data(deployment, args, json.dumps(args))\ .AndReturn(raw) + message.ack() self.mox.StubOutWithMock(consumer, '_check_memory', - use_mock_anything=True) + use_mock_anything=True) consumer._check_memory() self.mox.ReplayAll() consumer._process(message) self.assertEqual(consumer.processed, 1) self.mox.VerifyAll() + def test_process_no_raw_dont_ack(self): + deployment = self.mox.CreateMockAnything() + raw = self.mox.CreateMockAnything() + message = self.mox.CreateMockAnything() + + consumer = worker.NovaConsumer('test', None, deployment, True, {}) + routing_key = 'monitor.info' + message.delivery_info = {'routing_key': routing_key} + body_dict = {u'key': u'value'} + message.body = json.dumps(body_dict) + self.mox.StubOutWithMock(views, 'process_raw_data', + use_mock_anything=True) + args = (routing_key, body_dict) + views.process_raw_data(deployment, args, json.dumps(args))\ + .AndReturn(None) + self.mox.StubOutWithMock(consumer, '_check_memory', + use_mock_anything=True) + consumer._check_memory() + self.mox.ReplayAll() + consumer._process(message) + self.assertEqual(consumer.processed, 0) + self.mox.VerifyAll() + def test_run(self): config = { 'name': 'east_coast.prod.global', diff --git a/worker/worker.py b/worker/worker.py index 23accd8..aefdb78 100644 --- a/worker/worker.py +++ b/worker/worker.py @@ -89,6 +89,7 @@ class NovaConsumer(kombu.mixins.ConsumerMixin): raw = views.process_raw_data(self.deployment, args, asJson) if raw: self.processed += 1 + message.ack() self._check_memory() @@ -125,7 +126,6 @@ class NovaConsumer(kombu.mixins.ConsumerMixin): self._process(message) except Exception, e: LOG.exception("Problem %s" % e) - message.ack() def continue_running():