
The datetime.utcfromtimestamp() is deprecated in Python 3.12. Replace datetime.utcfromtimestamp() with datetime.fromtimestamp(). Change-Id: I495e783c3a7c37e1b328c5d0920553d0d1eb9352 Signed-off-by: Takashi Natsume <takanattie@gmail.com>
306 lines
13 KiB
Python
306 lines
13 KiB
Python
# Copyright (c) 2015 Catalyst IT Ltd.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
import datetime
|
|
|
|
import falcon
|
|
from oslo_log import log as logging
|
|
from oslo_utils import netutils
|
|
from oslo_utils import timeutils
|
|
from stevedore import driver
|
|
|
|
from zaqar.common import decorators
|
|
from zaqar.i18n import _
|
|
from zaqar.notification import notifier
|
|
from zaqar.storage import errors as storage_errors
|
|
from zaqar.transport import acl
|
|
from zaqar.transport import utils
|
|
from zaqar.transport import validation
|
|
from zaqar.transport.wsgi import errors as wsgi_errors
|
|
from zaqar.transport.wsgi import utils as wsgi_utils
|
|
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
class ItemResource(object):
|
|
|
|
__slots__ = ('_validate', '_subscription_controller')
|
|
|
|
def __init__(self, validate, subscription_controller):
|
|
self._validate = validate
|
|
self._subscription_controller = subscription_controller
|
|
|
|
@decorators.TransportLog("Subscriptions item")
|
|
@acl.enforce("subscription:get")
|
|
def on_get(self, req, resp, project_id, queue_name, subscription_id):
|
|
try:
|
|
resp_dict = self._subscription_controller.get(queue_name,
|
|
subscription_id,
|
|
project=project_id)
|
|
|
|
except storage_errors.DoesNotExist as ex:
|
|
LOG.debug(ex)
|
|
raise wsgi_errors.HTTPNotFound(str(ex))
|
|
|
|
except Exception:
|
|
description = _(u'Subscription could not be retrieved.')
|
|
LOG.exception(description)
|
|
raise wsgi_errors.HTTPServiceUnavailable(description)
|
|
|
|
resp.body = utils.to_json(resp_dict)
|
|
# status defaults to 200
|
|
|
|
@decorators.TransportLog("Subscriptions item")
|
|
@acl.enforce("subscription:delete")
|
|
def on_delete(self, req, resp, project_id, queue_name, subscription_id):
|
|
try:
|
|
self._subscription_controller.delete(queue_name,
|
|
subscription_id,
|
|
project=project_id)
|
|
|
|
except Exception:
|
|
description = _(u'Subscription could not be deleted.')
|
|
LOG.exception(description)
|
|
raise wsgi_errors.HTTPServiceUnavailable(description)
|
|
|
|
resp.status = falcon.HTTP_204
|
|
|
|
@decorators.TransportLog("Subscriptions item")
|
|
@acl.enforce("subscription:update")
|
|
def on_patch(self, req, resp, project_id, queue_name, subscription_id):
|
|
if req.content_length:
|
|
document = wsgi_utils.deserialize(req.stream, req.content_length)
|
|
else:
|
|
document = {}
|
|
|
|
try:
|
|
self._validate.subscription_patching(document)
|
|
self._subscription_controller.update(queue_name, subscription_id,
|
|
project=project_id,
|
|
**document)
|
|
resp.status = falcon.HTTP_204
|
|
resp.location = req.path
|
|
except storage_errors.SubscriptionDoesNotExist as ex:
|
|
LOG.debug(ex)
|
|
raise wsgi_errors.HTTPNotFound(str(ex))
|
|
except storage_errors.SubscriptionAlreadyExists as ex:
|
|
LOG.debug(ex)
|
|
raise wsgi_errors.HTTPConflict(str(ex))
|
|
except validation.ValidationFailed as ex:
|
|
LOG.debug(ex)
|
|
raise wsgi_errors.HTTPBadRequestAPI(str(ex))
|
|
except Exception:
|
|
description = (_(u'Subscription %(subscription_id)s could not be'
|
|
' updated.') %
|
|
dict(subscription_id=subscription_id))
|
|
LOG.exception(description)
|
|
raise falcon.HTTPBadRequest(_('Unable to update subscription'),
|
|
description)
|
|
|
|
|
|
class CollectionResource(object):
|
|
|
|
__slots__ = ('_subscription_controller', '_validate',
|
|
'_default_subscription_ttl', '_queue_controller',
|
|
'_conf', '_notification')
|
|
|
|
def __init__(self, validate, subscription_controller,
|
|
default_subscription_ttl, queue_controller, conf):
|
|
self._subscription_controller = subscription_controller
|
|
self._validate = validate
|
|
self._default_subscription_ttl = default_subscription_ttl
|
|
self._queue_controller = queue_controller
|
|
self._conf = conf
|
|
self._notification = notifier.NotifierDriver()
|
|
|
|
@decorators.TransportLog("Subscriptions collection")
|
|
@acl.enforce("subscription:get_all")
|
|
def on_get(self, req, resp, project_id, queue_name):
|
|
kwargs = {}
|
|
|
|
# NOTE(kgriffs): This syntax ensures that
|
|
# we don't clobber default values with None.
|
|
req.get_param('marker', store=kwargs)
|
|
req.get_param_as_int('limit', store=kwargs)
|
|
|
|
try:
|
|
self._validate.subscription_listing(**kwargs)
|
|
results = self._subscription_controller.list(queue_name,
|
|
project=project_id,
|
|
**kwargs)
|
|
# Buffer list of subscriptions. Can raise NoPoolFound error.
|
|
subscriptions = list(next(results))
|
|
except validation.ValidationFailed as ex:
|
|
LOG.debug(ex)
|
|
raise wsgi_errors.HTTPBadRequestAPI(str(ex))
|
|
|
|
except Exception:
|
|
description = _(u'Subscriptions could not be listed.')
|
|
LOG.exception(description)
|
|
raise wsgi_errors.HTTPServiceUnavailable(description)
|
|
|
|
# Got some. Prepare the response.
|
|
kwargs['marker'] = next(results) or kwargs.get('marker', '')
|
|
|
|
links = []
|
|
if subscriptions:
|
|
links = [
|
|
{
|
|
'rel': 'next',
|
|
'href': req.path + falcon.to_query_str(kwargs)
|
|
}
|
|
]
|
|
response_body = {
|
|
'subscriptions': subscriptions,
|
|
'links': links
|
|
}
|
|
|
|
resp.body = utils.to_json(response_body)
|
|
# status defaults to 200
|
|
|
|
@decorators.TransportLog("Subscriptions collection")
|
|
@acl.enforce("subscription:create")
|
|
def on_post(self, req, resp, project_id, queue_name):
|
|
if req.content_length:
|
|
document = wsgi_utils.deserialize(req.stream, req.content_length)
|
|
else:
|
|
document = {}
|
|
|
|
try:
|
|
if not self._queue_controller.exists(queue_name, project_id):
|
|
self._queue_controller.create(queue_name, project=project_id)
|
|
self._validate.subscription_posting(document)
|
|
subscriber = document['subscriber']
|
|
options = document.get('options', {})
|
|
url = netutils.urlsplit(subscriber)
|
|
ttl = document.get('ttl', self._default_subscription_ttl)
|
|
mgr = driver.DriverManager('zaqar.notification.tasks', url.scheme,
|
|
invoke_on_load=True)
|
|
req_data = req.headers.copy()
|
|
req_data.update(req.env)
|
|
mgr.driver.register(subscriber, options, ttl, project_id, req_data)
|
|
|
|
created = self._subscription_controller.create(queue_name,
|
|
subscriber,
|
|
ttl,
|
|
options,
|
|
project=project_id)
|
|
except validation.ValidationFailed as ex:
|
|
LOG.debug(ex)
|
|
raise wsgi_errors.HTTPBadRequestAPI(str(ex))
|
|
except Exception:
|
|
description = _(u'Subscription could not be created.')
|
|
LOG.exception(description)
|
|
raise wsgi_errors.HTTPServiceUnavailable(description)
|
|
|
|
now = timeutils.utcnow_ts()
|
|
now_dt = datetime.datetime.fromtimestamp(
|
|
now, tz=datetime.timezone.utc).replace(tzinfo=None)
|
|
expires = now_dt + datetime.timedelta(seconds=ttl)
|
|
api_version = req.path.split('/')[1]
|
|
if created:
|
|
subscription = self._subscription_controller.get(queue_name,
|
|
created,
|
|
project_id)
|
|
# send confirm notification
|
|
self._notification.send_confirm_notification(
|
|
queue_name, subscription, self._conf, project_id,
|
|
str(expires), api_version)
|
|
|
|
resp.location = req.path
|
|
resp.status = falcon.HTTP_201
|
|
resp.body = utils.to_json(
|
|
{'subscription_id': str(created)})
|
|
else:
|
|
subscription = self._subscription_controller.get_with_subscriber(
|
|
queue_name, subscriber, project_id)
|
|
confirmed = subscription.get('confirmed', True)
|
|
if confirmed:
|
|
description = _(u'Such subscription already exists.'
|
|
u'Subscriptions are unique by project + queue '
|
|
u'+ subscriber URI.')
|
|
raise wsgi_errors.HTTPConflict(description,
|
|
headers={'location': req.path})
|
|
else:
|
|
# The subscription is not confirmed, re-send confirm
|
|
# notification
|
|
self._notification.send_confirm_notification(
|
|
queue_name, subscription, self._conf, project_id,
|
|
str(expires), api_version)
|
|
|
|
resp.location = req.path
|
|
resp.status = falcon.HTTP_201
|
|
resp.body = utils.to_json(
|
|
{'subscription_id': str(subscription['id'])})
|
|
|
|
|
|
class ConfirmResource(object):
|
|
|
|
__slots__ = ('_subscription_controller', '_validate', '_notification',
|
|
'_conf')
|
|
|
|
def __init__(self, validate, subscription_controller, conf):
|
|
self._subscription_controller = subscription_controller
|
|
self._validate = validate
|
|
self._notification = notifier.NotifierDriver()
|
|
self._conf = conf
|
|
|
|
@decorators.TransportLog("Subscriptions confirmation item")
|
|
@acl.enforce("subscription:confirm")
|
|
def on_put(self, req, resp, project_id, queue_name, subscription_id):
|
|
if req.content_length:
|
|
document = wsgi_utils.deserialize(req.stream, req.content_length)
|
|
else:
|
|
document = {}
|
|
|
|
try:
|
|
self._validate.subscription_confirming(document)
|
|
confirmed = document.get('confirmed')
|
|
self._subscription_controller.confirm(queue_name, subscription_id,
|
|
project=project_id,
|
|
confirmed=confirmed)
|
|
if confirmed is False:
|
|
now = timeutils.utcnow_ts()
|
|
now_dt = datetime.datetime.fromtimestamp(
|
|
now, tz=datetime.timezone.utc).replace(tzinfo=None)
|
|
ttl = self._conf.transport.default_subscription_ttl
|
|
expires = now_dt + datetime.timedelta(seconds=ttl)
|
|
api_version = req.path.split('/')[1]
|
|
sub = self._subscription_controller.get(queue_name,
|
|
subscription_id,
|
|
project=project_id)
|
|
self._notification.send_confirm_notification(queue_name,
|
|
sub,
|
|
self._conf,
|
|
project_id,
|
|
str(expires),
|
|
api_version,
|
|
True)
|
|
resp.status = falcon.HTTP_204
|
|
resp.location = req.path
|
|
except storage_errors.SubscriptionDoesNotExist as ex:
|
|
LOG.debug(ex)
|
|
raise wsgi_errors.HTTPNotFound(str(ex))
|
|
except validation.ValidationFailed as ex:
|
|
LOG.debug(ex)
|
|
raise wsgi_errors.HTTPBadRequestAPI(str(ex))
|
|
except Exception:
|
|
description = (_(u'Subscription %(subscription_id)s could not be'
|
|
' confirmed.') %
|
|
dict(subscription_id=subscription_id))
|
|
LOG.exception(description)
|
|
raise falcon.HTTPBadRequest(_('Unable to confirm subscription'),
|
|
description)
|