Merge "Make queues lazy in subscriptions"
This commit is contained in:
commit
b43ff612dd
@ -0,0 +1,8 @@
|
||||
---
|
||||
features:
|
||||
- Queues now behave lazy in subscriptions also. So there is no need for
|
||||
the user to pre-create a queue before creating a subscription for this
|
||||
queue. Zaqar will create the queue automatically on the subscription
|
||||
creation request. As before, all subscriptions will continue to stay
|
||||
active even if the corresponding queue was deleted.
|
||||
|
@ -809,6 +809,9 @@ class Endpoints(object):
|
||||
'options': req._body.get('options'),
|
||||
'ttl': req._body.get('ttl')}
|
||||
self._validate.subscription_posting(data)
|
||||
self._validate.queue_identification(queue_name, project_id)
|
||||
if not self._queue_controller.exists(queue_name, project_id):
|
||||
self._queue_controller.create(queue_name, project=project_id)
|
||||
created = self._subscription_controller.create(queue_name,
|
||||
subscriber,
|
||||
data['ttl'],
|
||||
@ -818,11 +821,6 @@ class Endpoints(object):
|
||||
LOG.debug(ex)
|
||||
headers = {'status': 400}
|
||||
return api_utils.error_response(req, ex, headers)
|
||||
except storage_errors.DoesNotExist as ex:
|
||||
LOG.debug(ex)
|
||||
error = _('Queue %s does not exist.') % queue_name
|
||||
headers = {'status': 404}
|
||||
return api_utils.error_response(req, ex, headers, error)
|
||||
except storage_errors.ExceptionBase as ex:
|
||||
LOG.exception(ex)
|
||||
error = _('Subscription %s could not be created.') % queue_name
|
||||
|
@ -53,7 +53,6 @@ class SubscriptionController(base.Subscription):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(SubscriptionController, self).__init__(*args, **kwargs)
|
||||
self._collection = self.driver.subscriptions_database.subscriptions
|
||||
self._queue_ctrl = self.driver.queue_controller
|
||||
self._collection.ensure_index(SUBSCRIPTIONS_INDEX, unique=True)
|
||||
# NOTE(flwang): MongoDB will automatically delete the subscription
|
||||
# from the subscriptions collection when the subscription's 'e' value
|
||||
@ -105,8 +104,6 @@ class SubscriptionController(base.Subscription):
|
||||
now_dt = datetime.datetime.utcfromtimestamp(now)
|
||||
expires = now_dt + datetime.timedelta(seconds=ttl)
|
||||
|
||||
if not self._queue_ctrl.exists(source, project):
|
||||
raise errors.QueueDoesNotExist(source, project)
|
||||
try:
|
||||
subscription_id = self._collection.insert({'s': source,
|
||||
'u': subscriber,
|
||||
|
@ -19,7 +19,6 @@ import msgpack
|
||||
from oslo_utils import timeutils
|
||||
import redis
|
||||
|
||||
from zaqar.common import decorators
|
||||
from zaqar.common import utils as common_utils
|
||||
from zaqar.storage import base
|
||||
from zaqar.storage import errors
|
||||
@ -53,10 +52,6 @@ class SubscriptionController(base.Subscription):
|
||||
use_bin_type=True).pack
|
||||
self._unpacker = functools.partial(msgpack.unpackb, encoding='utf-8')
|
||||
|
||||
@decorators.lazy_property(write=False)
|
||||
def _queue_ctrl(self):
|
||||
return self.driver.queue_controller
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_connection_error
|
||||
def list(self, queue, project=None, marker=None, limit=10):
|
||||
@ -122,8 +117,6 @@ class SubscriptionController(base.Subscription):
|
||||
'o': self._packer(options),
|
||||
'p': project}
|
||||
|
||||
if not self._queue_ctrl.exists(queue, project):
|
||||
raise errors.QueueDoesNotExist(queue, project)
|
||||
try:
|
||||
# Pipeline ensures atomic inserts.
|
||||
with self._client.pipeline() as pipe:
|
||||
|
@ -964,6 +964,7 @@ class ClaimControllerTest(ControllerBaseTest):
|
||||
project=self.project)
|
||||
|
||||
|
||||
@ddt.ddt
|
||||
class SubscriptionControllerTest(ControllerBaseTest):
|
||||
"""Subscriptions Controller base tests.
|
||||
|
||||
@ -974,10 +975,7 @@ class SubscriptionControllerTest(ControllerBaseTest):
|
||||
def setUp(self):
|
||||
super(SubscriptionControllerTest, self).setUp()
|
||||
self.subscription_controller = self.driver.subscription_controller
|
||||
|
||||
# Lets create a queue as the source of subscription
|
||||
self.queue_controller = self.driver.queue_controller
|
||||
self.queue_controller.create(self.queue_name, project=self.project)
|
||||
|
||||
self.source = self.queue_name
|
||||
self.subscriber = 'http://trigger.me'
|
||||
@ -988,7 +986,16 @@ class SubscriptionControllerTest(ControllerBaseTest):
|
||||
self.queue_controller.delete(self.queue_name, project=self.project)
|
||||
super(SubscriptionControllerTest, self).tearDown()
|
||||
|
||||
def test_list(self):
|
||||
# NOTE(Eva-i): this method helps to test cases when the queue is
|
||||
# pre-created and when it's not.
|
||||
def _precreate_queue(self, precreate_queue):
|
||||
if precreate_queue:
|
||||
# Let's create a queue as the source of subscription
|
||||
self.queue_controller.create(self.queue_name, project=self.project)
|
||||
|
||||
@ddt.data(True, False)
|
||||
def test_list(self, precreate_queue):
|
||||
self._precreate_queue(precreate_queue)
|
||||
for s in six.moves.xrange(15):
|
||||
subscriber = 'http://fake_{0}'.format(s)
|
||||
s_id = self.subscription_controller.create(
|
||||
@ -1022,14 +1029,18 @@ class SubscriptionControllerTest(ControllerBaseTest):
|
||||
subscriptions)))
|
||||
self.assertEqual(5, len(subscriptions))
|
||||
|
||||
def test_get_raises_if_subscription_does_not_exist(self):
|
||||
@ddt.data(True, False)
|
||||
def test_get_raises_if_subscription_does_not_exist(self, precreate_queue):
|
||||
self._precreate_queue(precreate_queue)
|
||||
self.assertRaises(errors.SubscriptionDoesNotExist,
|
||||
self.subscription_controller.get,
|
||||
self.queue_name,
|
||||
'notexists',
|
||||
project=self.project)
|
||||
|
||||
def test_lifecycle(self):
|
||||
@ddt.data(True, False)
|
||||
def test_lifecycle(self, precreate_queue):
|
||||
self._precreate_queue(precreate_queue)
|
||||
s_id = self.subscription_controller.create(self.source,
|
||||
self.subscriber,
|
||||
self.ttl,
|
||||
@ -1073,7 +1084,9 @@ class SubscriptionControllerTest(ControllerBaseTest):
|
||||
self.subscription_controller.get,
|
||||
self.queue_name, s_id)
|
||||
|
||||
def test_create_existed(self):
|
||||
@ddt.data(True, False)
|
||||
def test_create_existed(self, precreate_queue):
|
||||
self._precreate_queue(precreate_queue)
|
||||
s_id = self.subscription_controller.create(
|
||||
self.source,
|
||||
self.subscriber,
|
||||
@ -1092,15 +1105,23 @@ class SubscriptionControllerTest(ControllerBaseTest):
|
||||
self.assertIsNone(s_id)
|
||||
|
||||
def test_nonexist_source(self):
|
||||
self.assertRaises(errors.QueueDoesNotExist,
|
||||
self.subscription_controller.create,
|
||||
'fake_queue_name',
|
||||
self.subscriber,
|
||||
self.ttl,
|
||||
self.options,
|
||||
self.project)
|
||||
try:
|
||||
s_id = self.subscription_controller.create('fake_queue_name',
|
||||
self.subscriber,
|
||||
self.ttl,
|
||||
self.options,
|
||||
self.project)
|
||||
except Exception:
|
||||
self.fail("Subscription controller should not raise an exception "
|
||||
"in case of non-existing queue.")
|
||||
self.addCleanup(self.subscription_controller.delete, self.source, s_id,
|
||||
self.project)
|
||||
|
||||
def test_update_raises_if_try_to_update_to_existing_subscription(self):
|
||||
@ddt.data(True, False)
|
||||
def test_update_raises_if_try_to_update_to_existing_subscription(
|
||||
self,
|
||||
precreate_queue):
|
||||
self._precreate_queue(precreate_queue)
|
||||
# create two subscriptions: fake_0 and fake_1
|
||||
ids = []
|
||||
for s in six.moves.xrange(2):
|
||||
@ -1130,7 +1151,10 @@ class SubscriptionControllerTest(ControllerBaseTest):
|
||||
project=self.project,
|
||||
**update_fields)
|
||||
|
||||
def test_update_raises_if_subscription_does_not_exist(self):
|
||||
@ddt.data(True, False)
|
||||
def test_update_raises_if_subscription_does_not_exist(self,
|
||||
precreate_queue):
|
||||
self._precreate_queue(precreate_queue)
|
||||
update_fields = {
|
||||
'subscriber': 'http://fake'
|
||||
}
|
||||
@ -1617,7 +1641,6 @@ class FlavorsControllerTest(ControllerBaseTest):
|
||||
|
||||
def _insert_fixtures(controller, queue_name, project=None,
|
||||
client_uuid=None, num=4, ttl=120):
|
||||
|
||||
def messages():
|
||||
for n in six.moves.xrange(num):
|
||||
yield {
|
||||
|
@ -159,10 +159,24 @@ class SubscriptionTest(base.V1_1Base):
|
||||
req = test_utils.create_request(action, body, self.headers)
|
||||
self.protocol.onMessage(req, False)
|
||||
|
||||
[subscriber] = list(
|
||||
next(
|
||||
self.boot.storage.subscription_controller.list(
|
||||
'shuffle', self.project_id)))
|
||||
self.addCleanup(
|
||||
self.boot.storage.subscription_controller.delete, 'shuffle',
|
||||
subscriber['id'], project=self.project_id)
|
||||
|
||||
response = {
|
||||
'body': {'message': 'Subscription shuffle created.',
|
||||
'subscription_id': subscriber['id']},
|
||||
'headers': {'status': 201},
|
||||
'request': {'action': 'subscription_create',
|
||||
'body': {'queue_name': 'shuffle', 'ttl': 600},
|
||||
'api': 'v2', 'headers': self.headers}}
|
||||
|
||||
self.assertEqual(1, sender.call_count)
|
||||
self.assertEqual(
|
||||
'Queue shuffle does not exist.',
|
||||
json.loads(sender.call_args[0][0])['body']['error'])
|
||||
self.assertEqual(response, json.loads(sender.call_args[0][0]))
|
||||
|
||||
def test_subscription_get(self):
|
||||
sub = self.boot.storage.subscription_controller.create(
|
||||
|
@ -100,7 +100,8 @@ def public_endpoints(driver, conf):
|
||||
('/queues/{queue_name}/subscriptions',
|
||||
subscriptions.CollectionResource(driver._validate,
|
||||
subscription_controller,
|
||||
defaults.subscription_ttl)),
|
||||
defaults.subscription_ttl,
|
||||
queue_controller)),
|
||||
|
||||
('/queues/{queue_name}/subscriptions/{subscription_id}',
|
||||
subscriptions.ItemResource(driver._validate,
|
||||
|
@ -109,13 +109,14 @@ class ItemResource(object):
|
||||
class CollectionResource(object):
|
||||
|
||||
__slots__ = ('_subscription_controller', '_validate',
|
||||
'_default_subscription_ttl')
|
||||
'_default_subscription_ttl', '_queue_controller')
|
||||
|
||||
def __init__(self, validate, subscription_controller,
|
||||
default_subscription_ttl):
|
||||
default_subscription_ttl, queue_controller):
|
||||
self._subscription_controller = subscription_controller
|
||||
self._validate = validate
|
||||
self._default_subscription_ttl = default_subscription_ttl
|
||||
self._queue_controller = queue_controller
|
||||
|
||||
@decorators.TransportLog("Subscription collection")
|
||||
@acl.enforce("subscription:get_all")
|
||||
@ -171,6 +172,8 @@ class CollectionResource(object):
|
||||
document = {}
|
||||
|
||||
try:
|
||||
if not self._queue_controller.exists(queue_name, project_id):
|
||||
self._queue_controller.create(queue_name, project=project_id)
|
||||
self._validate.subscription_posting(document)
|
||||
subscriber = document['subscriber']
|
||||
ttl = document.get('ttl', self._default_subscription_ttl)
|
||||
@ -181,9 +184,6 @@ class CollectionResource(object):
|
||||
options,
|
||||
project=project_id)
|
||||
|
||||
except storage_errors.QueueDoesNotExist as ex:
|
||||
LOG.exception(ex)
|
||||
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
|
||||
except validation.ValidationFailed as ex:
|
||||
LOG.debug(ex)
|
||||
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
|
||||
|
Loading…
x
Reference in New Issue
Block a user