diff --git a/releasenotes/notes/lazy-queues-in-subscriptions-6bade4a1b8eca3e5.yaml b/releasenotes/notes/lazy-queues-in-subscriptions-6bade4a1b8eca3e5.yaml new file mode 100644 index 000000000..2d19cec12 --- /dev/null +++ b/releasenotes/notes/lazy-queues-in-subscriptions-6bade4a1b8eca3e5.yaml @@ -0,0 +1,8 @@ +--- +features: + - Queues now behave lazy in subscriptions also. So there is no need for + the user to pre-create a queue before creating a subscription for this + queue. Zaqar will create the queue automatically on the subscription + creation request. As before, all subscriptions will continue to stay + active even if the corresponding queue was deleted. + diff --git a/zaqar/api/v2/endpoints.py b/zaqar/api/v2/endpoints.py index 22f966247..547b97dd5 100644 --- a/zaqar/api/v2/endpoints.py +++ b/zaqar/api/v2/endpoints.py @@ -809,6 +809,9 @@ class Endpoints(object): 'options': req._body.get('options'), 'ttl': req._body.get('ttl')} self._validate.subscription_posting(data) + self._validate.queue_identification(queue_name, project_id) + if not self._queue_controller.exists(queue_name, project_id): + self._queue_controller.create(queue_name, project=project_id) created = self._subscription_controller.create(queue_name, subscriber, data['ttl'], @@ -818,11 +821,6 @@ class Endpoints(object): LOG.debug(ex) headers = {'status': 400} return api_utils.error_response(req, ex, headers) - except storage_errors.DoesNotExist as ex: - LOG.debug(ex) - error = _('Queue %s does not exist.') % queue_name - headers = {'status': 404} - return api_utils.error_response(req, ex, headers, error) except storage_errors.ExceptionBase as ex: LOG.exception(ex) error = _('Subscription %s could not be created.') % queue_name diff --git a/zaqar/storage/mongodb/subscriptions.py b/zaqar/storage/mongodb/subscriptions.py index 66447eaf1..447cdf654 100644 --- a/zaqar/storage/mongodb/subscriptions.py +++ b/zaqar/storage/mongodb/subscriptions.py @@ -53,7 +53,6 @@ class SubscriptionController(base.Subscription): def __init__(self, *args, **kwargs): super(SubscriptionController, self).__init__(*args, **kwargs) self._collection = self.driver.subscriptions_database.subscriptions - self._queue_ctrl = self.driver.queue_controller self._collection.ensure_index(SUBSCRIPTIONS_INDEX, unique=True) # NOTE(flwang): MongoDB will automatically delete the subscription # from the subscriptions collection when the subscription's 'e' value @@ -109,8 +108,6 @@ class SubscriptionController(base.Subscription): now_dt = datetime.datetime.utcfromtimestamp(now) expires = now_dt + datetime.timedelta(seconds=ttl) - if not self._queue_ctrl.exists(source, project): - raise errors.QueueDoesNotExist(source, project) try: subscription_id = self._collection.insert({'s': source, 'u': subscriber, diff --git a/zaqar/storage/redis/subscriptions.py b/zaqar/storage/redis/subscriptions.py index c8dd46771..36ae3e9f3 100644 --- a/zaqar/storage/redis/subscriptions.py +++ b/zaqar/storage/redis/subscriptions.py @@ -19,7 +19,6 @@ import msgpack from oslo_utils import timeutils import redis -from zaqar.common import decorators from zaqar.common import utils as common_utils from zaqar.storage import base from zaqar.storage import errors @@ -53,10 +52,6 @@ class SubscriptionController(base.Subscription): use_bin_type=True).pack self._unpacker = functools.partial(msgpack.unpackb, encoding='utf-8') - @decorators.lazy_property(write=False) - def _queue_ctrl(self): - return self.driver.queue_controller - @utils.raises_conn_error @utils.retries_on_connection_error def list(self, queue, project=None, marker=None, limit=10): @@ -117,8 +112,6 @@ class SubscriptionController(base.Subscription): 'o': self._packer(options), 'p': project} - if not self._queue_ctrl.exists(queue, project): - raise errors.QueueDoesNotExist(queue, project) try: # Pipeline ensures atomic inserts. with self._client.pipeline() as pipe: diff --git a/zaqar/tests/unit/storage/base.py b/zaqar/tests/unit/storage/base.py index 90028baf8..9d8b3ce13 100644 --- a/zaqar/tests/unit/storage/base.py +++ b/zaqar/tests/unit/storage/base.py @@ -964,6 +964,7 @@ class ClaimControllerTest(ControllerBaseTest): project=self.project) +@ddt.ddt class SubscriptionControllerTest(ControllerBaseTest): """Subscriptions Controller base tests. @@ -974,10 +975,7 @@ class SubscriptionControllerTest(ControllerBaseTest): def setUp(self): super(SubscriptionControllerTest, self).setUp() self.subscription_controller = self.driver.subscription_controller - - # Lets create a queue as the source of subscription self.queue_controller = self.driver.queue_controller - self.queue_controller.create(self.queue_name, project=self.project) self.source = self.queue_name self.subscriber = 'http://trigger.me' @@ -988,7 +986,16 @@ class SubscriptionControllerTest(ControllerBaseTest): self.queue_controller.delete(self.queue_name, project=self.project) super(SubscriptionControllerTest, self).tearDown() - def test_list(self): + # NOTE(Eva-i): this method helps to test cases when the queue is + # pre-created and when it's not. + def _precreate_queue(self, precreate_queue): + if precreate_queue: + # Let's create a queue as the source of subscription + self.queue_controller.create(self.queue_name, project=self.project) + + @ddt.data(True, False) + def test_list(self, precreate_queue): + self._precreate_queue(precreate_queue) for s in six.moves.xrange(15): subscriber = 'http://fake_{0}'.format(s) s_id = self.subscription_controller.create( @@ -1019,14 +1026,18 @@ class SubscriptionControllerTest(ControllerBaseTest): subscriptions))) self.assertEqual(5, len(subscriptions)) - def test_get_raises_if_subscription_does_not_exist(self): + @ddt.data(True, False) + def test_get_raises_if_subscription_does_not_exist(self, precreate_queue): + self._precreate_queue(precreate_queue) self.assertRaises(errors.SubscriptionDoesNotExist, self.subscription_controller.get, self.queue_name, 'notexists', project=self.project) - def test_lifecycle(self): + @ddt.data(True, False) + def test_lifecycle(self, precreate_queue): + self._precreate_queue(precreate_queue) s_id = self.subscription_controller.create(self.source, self.subscriber, self.ttl, @@ -1068,7 +1079,9 @@ class SubscriptionControllerTest(ControllerBaseTest): self.subscription_controller.get, self.queue_name, s_id) - def test_create_existed(self): + @ddt.data(True, False) + def test_create_existed(self, precreate_queue): + self._precreate_queue(precreate_queue) s_id = self.subscription_controller.create( self.source, self.subscriber, @@ -1087,15 +1100,23 @@ class SubscriptionControllerTest(ControllerBaseTest): self.assertIsNone(s_id) def test_nonexist_source(self): - self.assertRaises(errors.QueueDoesNotExist, - self.subscription_controller.create, - 'fake_queue_name', - self.subscriber, - self.ttl, - self.options, - self.project) + try: + s_id = self.subscription_controller.create('fake_queue_name', + self.subscriber, + self.ttl, + self.options, + self.project) + except Exception: + self.fail("Subscription controller should not raise an exception " + "in case of non-existing queue.") + self.addCleanup(self.subscription_controller.delete, self.source, s_id, + self.project) - def test_update_raises_if_try_to_update_to_existing_subscription(self): + @ddt.data(True, False) + def test_update_raises_if_try_to_update_to_existing_subscription( + self, + precreate_queue): + self._precreate_queue(precreate_queue) # create two subscriptions: fake_0 and fake_1 ids = [] for s in six.moves.xrange(2): @@ -1125,7 +1146,10 @@ class SubscriptionControllerTest(ControllerBaseTest): project=self.project, **update_fields) - def test_update_raises_if_subscription_does_not_exist(self): + @ddt.data(True, False) + def test_update_raises_if_subscription_does_not_exist(self, + precreate_queue): + self._precreate_queue(precreate_queue) update_fields = { 'subscriber': 'http://fake' } @@ -1612,7 +1636,6 @@ class FlavorsControllerTest(ControllerBaseTest): def _insert_fixtures(controller, queue_name, project=None, client_uuid=None, num=4, ttl=120): - def messages(): for n in six.moves.xrange(num): yield { diff --git a/zaqar/tests/unit/transport/websocket/v2/test_subscriptions.py b/zaqar/tests/unit/transport/websocket/v2/test_subscriptions.py index 2a460558c..1a438fc3a 100644 --- a/zaqar/tests/unit/transport/websocket/v2/test_subscriptions.py +++ b/zaqar/tests/unit/transport/websocket/v2/test_subscriptions.py @@ -155,10 +155,24 @@ class SubscriptionTest(base.V1_1Base): req = test_utils.create_request(action, body, self.headers) self.protocol.onMessage(req, False) + [subscriber] = list( + next( + self.boot.storage.subscription_controller.list( + 'shuffle', self.project_id))) + self.addCleanup( + self.boot.storage.subscription_controller.delete, 'shuffle', + subscriber['id'], project=self.project_id) + + response = { + 'body': {'message': 'Subscription shuffle created.', + 'subscription_id': subscriber['id']}, + 'headers': {'status': 201}, + 'request': {'action': 'subscription_create', + 'body': {'queue_name': 'shuffle', 'ttl': 600}, + 'api': 'v2', 'headers': self.headers}} + self.assertEqual(1, sender.call_count) - self.assertEqual( - 'Queue shuffle does not exist.', - json.loads(sender.call_args[0][0])['body']['error']) + self.assertEqual(response, json.loads(sender.call_args[0][0])) def test_subscription_get(self): sub = self.boot.storage.subscription_controller.create( diff --git a/zaqar/transport/wsgi/v2_0/__init__.py b/zaqar/transport/wsgi/v2_0/__init__.py index 3445ba216..f5c3eec07 100644 --- a/zaqar/transport/wsgi/v2_0/__init__.py +++ b/zaqar/transport/wsgi/v2_0/__init__.py @@ -100,7 +100,8 @@ def public_endpoints(driver, conf): ('/queues/{queue_name}/subscriptions', subscriptions.CollectionResource(driver._validate, subscription_controller, - defaults.subscription_ttl)), + defaults.subscription_ttl, + queue_controller)), ('/queues/{queue_name}/subscriptions/{subscription_id}', subscriptions.ItemResource(driver._validate, diff --git a/zaqar/transport/wsgi/v2_0/subscriptions.py b/zaqar/transport/wsgi/v2_0/subscriptions.py index 9b4c1ece6..0510a36a3 100644 --- a/zaqar/transport/wsgi/v2_0/subscriptions.py +++ b/zaqar/transport/wsgi/v2_0/subscriptions.py @@ -109,13 +109,14 @@ class ItemResource(object): class CollectionResource(object): __slots__ = ('_subscription_controller', '_validate', - '_default_subscription_ttl') + '_default_subscription_ttl', '_queue_controller') def __init__(self, validate, subscription_controller, - default_subscription_ttl): + default_subscription_ttl, queue_controller): self._subscription_controller = subscription_controller self._validate = validate self._default_subscription_ttl = default_subscription_ttl + self._queue_controller = queue_controller @decorators.TransportLog("Subscription collection") @acl.enforce("subscription:get_all") @@ -171,6 +172,8 @@ class CollectionResource(object): document = {} try: + if not self._queue_controller.exists(queue_name, project_id): + self._queue_controller.create(queue_name, project=project_id) self._validate.subscription_posting(document) subscriber = document['subscriber'] ttl = document.get('ttl', self._default_subscription_ttl) @@ -181,9 +184,6 @@ class CollectionResource(object): options, project=project_id) - except storage_errors.QueueDoesNotExist as ex: - LOG.exception(ex) - raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex)) except validation.ValidationFailed as ex: LOG.debug(ex) raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))