Remove api vi support 2

This patch remove the v1 api codes from Zaqar tree.

Change-Id: I6bc438788018f4183a0afed93214773739ee0be1
This commit is contained in:
hwang 2024-12-31 10:43:02 -08:00
parent 4e5d6e5301
commit fb0b6d6785
27 changed files with 369 additions and 3941 deletions

View File

@ -1,388 +0,0 @@
# Copyright (c) 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from zaqar.common.api import api
from zaqar.common import consts
class RequestSchema(api.Api):
headers = {
'User-Agent': {'type': 'string'},
'Date': {'type': 'string'},
'Accept': {'type': 'string'},
'Client-ID': {'type': 'string'},
'X-Project-ID': {'type': 'string'},
'X-Auth-Token': {'type': 'string'}
}
schema = {
# Base
'get_home_doc': {
'properties': {
'action': {'enum': ['get_home_doc']},
'headers': {
'type': 'object',
'properties': headers,
}
},
'required': ['action', 'headers'],
'admin': True,
},
'check_node_health': {
'properties': {
'action': {'enum': ['check_node_health']},
'headers': {
'type': 'object',
'properties': headers,
}
},
'required': ['action', 'headers'],
'admin': True,
},
'ping_node': {
'properties': {
'action': {'enum': ['ping_node']},
'headers': {
'type': 'object',
'properties': headers,
}
},
'required': ['action', 'headers'],
'admin': True,
},
'authenticate': {
'properties': {
'action': {'enum': ['authenticate']},
'headers': {
'type': 'object',
'properties': headers,
'required': ['X-Project-ID', 'X-Auth-Token']
}
},
'required': ['action', 'headers'],
},
# Queues
consts.QUEUE_LIST: {
'properties': {
'action': {'enum': [consts.QUEUE_LIST]},
'headers': {
'type': 'object',
'properties': headers,
'required': ['Client-ID', 'X-Project-ID']
},
'body': {
'type': 'object',
'properties': {
'marker': {'type': 'string'},
'limit': {'type': 'integer'},
'detailed': {'type': 'boolean'}
}
}
},
'required': ['action', 'headers']
},
consts.QUEUE_CREATE: {
'properties': {
'action': {'enum': [consts.QUEUE_CREATE]},
'headers': {
'type': 'object',
'properties': headers,
'required': ['Client-ID', 'X-Project-ID']},
'body': {
'type': 'object',
'properties': {
'queue_name': {'type': 'string'},
},
'required': ['queue_name'],
}
},
'required': ['action', 'headers', 'body']
},
consts.QUEUE_DELETE: {
'properties': {
'action': {'enum': [consts.QUEUE_DELETE]},
'headers': {
'type': 'object',
'properties': headers,
'required': ['Client-ID', 'X-Project-ID']
},
'body': {
'type': 'object',
'properties': {
'queue_name': {'type': 'string'},
},
'required': ['queue_name']
}
},
'required': ['action', 'headers', 'body']
},
consts.QUEUE_GET: {
'properties': {
'action': {'enum': [consts.QUEUE_GET]},
'headers': {
'type': 'object',
'properties': headers,
'required': ['Client-ID', 'X-Project-ID']
},
'body': {
'type': 'object',
'properties': {
'queue_name': {'type': 'string'},
},
'required': ['queue_name'],
}
},
'required': ['action', 'headers', 'body']
},
consts.QUEUE_GET_STATS: {
'properties': {
'action': {'enum': [consts.QUEUE_GET_STATS]},
'headers': {
'type': 'object',
'properties': headers,
'required': ['Client-ID', 'X-Project-ID']
},
'body': {
'type': 'object',
'properties': {
'queue_name': {'type': 'string'},
},
'required': ['queue_name'],
}
},
'required': ['action', 'headers', 'body'],
'admin': True
},
# Messages
consts.MESSAGE_LIST: {
'properties': {
'action': {'enum': [consts.MESSAGE_LIST]},
'headers': {
'type': 'object',
'properties': headers,
'required': ['Client-ID', 'X-Project-ID']
},
'body': {
'type': 'object',
'properties': {
'queue_name': {'type': 'string'},
'marker': {'type': 'string'},
'limit': {'type': 'integer'},
'echo': {'type': 'boolean'},
'include_claimed': {'type': 'boolean'},
},
'required': ['queue_name'],
}
},
'required': ['action', 'headers', 'body']
},
consts.MESSAGE_GET: {
'properties': {
'action': {'enum': [consts.MESSAGE_GET]},
'headers': {
'type': 'object',
'properties': headers,
'required': ['Client-ID', 'X-Project-ID']
},
'body': {
'type': 'object',
'properties': {
'queue_name': {'type': 'string'},
'message_id': {'type': 'string'},
},
'required': ['queue_name', 'message_id'],
}
},
'required': ['action', 'headers', 'body']
},
consts.MESSAGE_GET_MANY: {
'properties': {
'action': {'enum': [consts.MESSAGE_GET_MANY]},
'headers': {
'type': 'object',
'properties': headers,
'required': ['Client-ID', 'X-Project-ID']
},
'body': {
'type': 'object',
'properties': {
'queue_name': {'type': 'string'},
'message_ids': {'type': 'array'},
},
'required': ['queue_name', 'message_ids'],
}
},
'required': ['action', 'headers', 'body']
},
consts.MESSAGE_POST: {
'properties': {
'action': {'enum': [consts.MESSAGE_POST]},
'headers': {
'type': 'object',
'properties': headers,
'required': ['Client-ID', 'X-Project-ID']
},
'body': {
'type': 'object',
'properties': {
'queue_name': {'type': 'string'},
'messages': {'type': 'array'},
},
'required': ['queue_name', 'messages'],
}
},
'required': ['action', 'headers', 'body']
},
consts.MESSAGE_DELETE: {
'properties': {
'action': {'enum': [consts.MESSAGE_DELETE]},
'headers': {
'type': 'object',
'properties': headers,
'required': ['Client-ID', 'X-Project-ID']
},
'body': {
'type': 'object',
'properties': {
'queue_name': {'type': 'string'},
'message_id': {'type': 'string'},
'claim_id': {'type': 'string'}
},
'required': ['queue_name', 'message_id'],
}
},
'required': ['action', 'headers', 'body']
},
consts.MESSAGE_DELETE_MANY: {
'properties': {
'action': {'enum': [consts.MESSAGE_DELETE_MANY]},
'headers': {
'type': 'object',
'properties': headers,
'required': ['Client-ID', 'X-Project-ID']
},
'body': {
'type': 'object',
'properties': {
'queue_name': {'type': 'string'},
'message_ids': {'type': 'array'},
'claim_ids': {'type': 'array'},
'pop': {'type': 'integer'}
},
'required': ['queue_name'],
}
},
'required': ['action', 'headers', 'body']
},
# Claims
consts.CLAIM_CREATE: {
'properties': {
'action': {'enum': [consts.CLAIM_CREATE]},
'headers': {
'type': 'object',
'properties': headers,
'required': ['Client-ID', 'X-Project-ID']
},
'body': {
'type': 'object',
'properties': {
'queue_name': {'type': 'string'},
'limit': {'type': 'integer'},
'ttl': {'type': 'integer'},
'grace': {'type': 'integer'}
},
'required': ['queue_name'],
}
},
'required': ['action', 'headers', 'body']
},
consts.CLAIM_GET: {
'properties': {
'action': {'enum': [consts.CLAIM_GET]},
'headers': {
'type': 'object',
'properties': headers,
'required': ['Client-ID', 'X-Project-ID']
},
'body': {
'type': 'object',
'properties': {
'queue_name': {'type': 'string'},
'claim_id': {'type': 'string'}
},
'required': ['queue_name', 'claim_id'],
}
},
'required': ['action', 'headers', 'body']
},
consts.CLAIM_UPDATE: {
'properties': {
'action': {'enum': [consts.CLAIM_UPDATE]},
'headers': {
'type': 'object',
'properties': headers,
'required': ['Client-ID', 'X-Project-ID']
},
'body': {
'type': 'object',
'properties': {
'queue_name': {'type': 'string'},
'claim_id': {'type': 'string'},
'ttl': {'type': 'integer'}
},
'required': ['queue_name', 'claim_id'],
}
},
'required': ['action', 'headers', 'body']
},
consts.CLAIM_DELETE: {
'properties': {
'action': {'enum': [consts.CLAIM_DELETE]},
'headers': {
'type': 'object',
'properties': headers,
'required': ['Client-ID', 'X-Project-ID']
},
'body': {
'type': 'object',
'properties': {
'queue_name': {'type': 'string'},
'claim_id': {'type': 'string'}
},
'required': ['queue_name', 'claim_id'],
}
},
'required': ['action', 'headers', 'body']
},
}

View File

@ -1,301 +0,0 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from zaqar.common.api import api
from zaqar.common import consts
class ResponseSchema(api.Api):
"""Define validation schema for json response."""
def __init__(self, limits):
self.limits = limits
age = {
"type": "number",
"minimum": 0
}
message = {
"type": "object",
"properties": {
"href": {
"type": "string",
"pattern": r"^(/v1/queues/[a-zA-Z0-9_-]"
r"{1,64}/messages/[a-zA-Z0-9_-]+)$"
},
"age": age,
"ttl": {
"type": "number",
"minimum": 1,
"maximum": self.limits.max_message_ttl
},
"body": {
"type": "object"
}
},
"required": ["href", "ttl", "age", "body"],
"additionalProperties": False,
}
claim_href = {
"type": "string",
"pattern": r"^(/v1/queues/[a-zA-Z0-9_-]{1,64}"
r"/messages/[a-zA-Z0-9_-]+)"
r"\?claim_id=[a-zA-Z0-9_-]+$"
}
self.schema = {
consts.QUEUE_LIST: {
'type': 'object',
'properties': {
'links': {
'type': 'array',
'items': {
'type': 'object',
'properties': {
'rel': {
'type': 'string',
'enum': ['next'],
},
'href': {
'type': 'string',
"pattern": r"^/v1/queues\?",
}
},
'required': ['rel', 'href'],
'additionalProperties': False,
},
'minItems': 1,
'maxItems': 1,
},
'queues': {
'type': 'array',
'items': {
'type': 'object',
'properties': {
'name': {
'type': 'string',
'pattern': r'^[a-zA-Z0-9_-]{1,64}$'
},
'href': {
'type': 'string',
'pattern': r'^/v1/queues/'
r'[a-zA-Z0-9_-]{1,64}$',
},
'metadata': {
'type': 'object',
}
},
'required': ['name', 'href'],
'additionalProperties': False,
},
'minItems': 1,
'maxItems': self.limits.max_queues_per_page,
}
},
'required': ['links', 'queues'],
'additionalProperties': False,
},
consts.QUEUE_GET_STATS: {
'type': 'object',
'properties': {
'messages': {
'type': 'object',
'properties': {
'free': {
'type': 'number',
'minimum': 0
},
'claimed': {
'type': 'number',
'minimum': 0
},
'total': {
'type': 'number',
'minimum': 0
},
'oldest': {
'type': 'object'
},
'newest': {
'type': 'object'
}
},
'required': ['free', 'claimed', 'total'],
'additionalProperties': False
}
},
'required': ['messages'],
'additionalProperties': False
},
consts.POOL_LIST: {
'type': 'object',
'properties': {
'links': {
'type': 'array',
'items': {
'type': 'object',
'properties': {
'rel': {
'type': 'string'
},
'href': {
'type': 'string',
'pattern': r'^/v1/pools\?'
}
},
'required': ['rel', 'href'],
'additionalProperties': False
}
},
'pools': {
'type': 'array',
'items': {
'type': 'object',
'properties': {
'href': {
'type': 'string',
'pattern': r'^/v1/'
r'pools/[a-zA-Z0-9_-]{1,64}$'
},
'weight': {
'type': 'number',
'minimum': -1
},
'name': {
'type': 'string'
},
'uri': {
'type': 'string'
},
'options': {
'type': 'object',
'additionalProperties': True
}
},
'required': ['href', 'weight', 'uri'],
'additionalProperties': False,
},
}
},
'required': ['links', 'pools'],
'additionalProperties': False
},
consts.MESSAGE_LIST: {
'type': 'object',
'properties': {
'links': {
'type': 'array',
'items': {
'type': 'object',
'properties': {
'rel': {
'type': 'string'
},
'href': {
'type': 'string',
'pattern': r'^/v1/queues/[a-zA-Z0-9_-]+'
r'/messages\?(.)*$'
}
},
'required': ['rel', 'href'],
'additionalProperties': False
}
},
'messages': {
"type": "array",
"items": message,
"minItems": 1,
"maxItems": self.limits.max_messages_per_claim_or_pop
}
}
},
consts.MESSAGE_GET_MANY: {
"type": "array",
"items": message,
"minItems": 1,
"maxItems": self.limits.max_messages_per_page
},
consts.CLAIM_CREATE: {
"type": "array",
"items": {
"type": "object",
"properties": {
"href": claim_href,
"ttl": {
"type": "number",
"minimum": 1,
"maximum": self.limits.max_message_ttl
},
"age": age,
"body": {
"type": "object"
}
},
"required": ["href", "ttl", "age", "body"],
"additionalProperties": False,
},
"minItems": 1,
"maxItems": self.limits.max_messages_per_page
},
consts.CLAIM_GET: {
'type': 'object',
'properties': {
'age': age,
'ttl': {
'type': 'number',
'minimum': 0,
'maximum': self.limits.max_claim_ttl
},
'href': {
'type': 'string',
'pattern': r'^/v1/queues/[a-zA-Z0-9_-]+'
r'/claims/[a-zA-Z0-9_-]+$'
},
'messages': {
"type": "array",
"items": {
"type": "object",
"properties": {
"href": claim_href,
"ttl": {
"type": "number",
"minimum": 1,
"maximum": self.limits.max_message_ttl
},
"age": age,
"body": {
"type": "object"
}
},
"required": ["href", "ttl", "age", "body"],
"additionalProperties": False,
},
"minItems": 1,
"maxItems": self.limits.max_messages_per_page
}
},
'required': ['age', 'ttl', 'messages', 'href'],
'additionalProperties': False
}
}

View File

@ -13,16 +13,378 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from zaqar.api.v1 import request as v1
from zaqar.common.api import api
from zaqar.common import consts
class RequestSchema(v1.RequestSchema):
class RequestSchema(api.Api):
headers = v1.RequestSchema.headers
schema = v1.RequestSchema.schema
headers = {
'User-Agent': {'type': 'string'},
'Date': {'type': 'string'},
'Accept': {'type': 'string'},
'Client-ID': {'type': 'string'},
'X-Project-ID': {'type': 'string'},
'X-Auth-Token': {'type': 'string'}
}
schema.update({
schema = {
# Base
'get_home_doc': {
'properties': {
'action': {'enum': ['get_home_doc']},
'headers': {
'type': 'object',
'properties': headers,
}
},
'required': ['action', 'headers'],
'admin': True,
},
'check_node_health': {
'properties': {
'action': {'enum': ['check_node_health']},
'headers': {
'type': 'object',
'properties': headers,
}
},
'required': ['action', 'headers'],
'admin': True,
},
'ping_node': {
'properties': {
'action': {'enum': ['ping_node']},
'headers': {
'type': 'object',
'properties': headers,
}
},
'required': ['action', 'headers'],
'admin': True,
},
'authenticate': {
'properties': {
'action': {'enum': ['authenticate']},
'headers': {
'type': 'object',
'properties': headers,
'required': ['X-Project-ID', 'X-Auth-Token']
}
},
'required': ['action', 'headers'],
},
# Queues
consts.QUEUE_LIST: {
'properties': {
'action': {'enum': [consts.QUEUE_LIST]},
'headers': {
'type': 'object',
'properties': headers,
'required': ['Client-ID', 'X-Project-ID']
},
'body': {
'type': 'object',
'properties': {
'marker': {'type': 'string'},
'limit': {'type': 'integer'},
'detailed': {'type': 'boolean'}
}
}
},
'required': ['action', 'headers']
},
consts.QUEUE_CREATE: {
'properties': {
'action': {'enum': [consts.QUEUE_CREATE]},
'headers': {
'type': 'object',
'properties': headers,
'required': ['Client-ID', 'X-Project-ID']},
'body': {
'type': 'object',
'properties': {
'queue_name': {'type': 'string'},
},
'required': ['queue_name'],
}
},
'required': ['action', 'headers', 'body']
},
consts.QUEUE_DELETE: {
'properties': {
'action': {'enum': [consts.QUEUE_DELETE]},
'headers': {
'type': 'object',
'properties': headers,
'required': ['Client-ID', 'X-Project-ID']
},
'body': {
'type': 'object',
'properties': {
'queue_name': {'type': 'string'},
},
'required': ['queue_name']
}
},
'required': ['action', 'headers', 'body']
},
consts.QUEUE_GET: {
'properties': {
'action': {'enum': [consts.QUEUE_GET]},
'headers': {
'type': 'object',
'properties': headers,
'required': ['Client-ID', 'X-Project-ID']
},
'body': {
'type': 'object',
'properties': {
'queue_name': {'type': 'string'},
},
'required': ['queue_name'],
}
},
'required': ['action', 'headers', 'body']
},
consts.QUEUE_GET_STATS: {
'properties': {
'action': {'enum': [consts.QUEUE_GET_STATS]},
'headers': {
'type': 'object',
'properties': headers,
'required': ['Client-ID', 'X-Project-ID']
},
'body': {
'type': 'object',
'properties': {
'queue_name': {'type': 'string'},
},
'required': ['queue_name'],
}
},
'required': ['action', 'headers', 'body'],
'admin': True
},
# Messages
consts.MESSAGE_LIST: {
'properties': {
'action': {'enum': [consts.MESSAGE_LIST]},
'headers': {
'type': 'object',
'properties': headers,
'required': ['Client-ID', 'X-Project-ID']
},
'body': {
'type': 'object',
'properties': {
'queue_name': {'type': 'string'},
'marker': {'type': 'string'},
'limit': {'type': 'integer'},
'echo': {'type': 'boolean'},
'include_claimed': {'type': 'boolean'},
},
'required': ['queue_name'],
}
},
'required': ['action', 'headers', 'body']
},
consts.MESSAGE_GET: {
'properties': {
'action': {'enum': [consts.MESSAGE_GET]},
'headers': {
'type': 'object',
'properties': headers,
'required': ['Client-ID', 'X-Project-ID']
},
'body': {
'type': 'object',
'properties': {
'queue_name': {'type': 'string'},
'message_id': {'type': 'string'},
},
'required': ['queue_name', 'message_id'],
}
},
'required': ['action', 'headers', 'body']
},
consts.MESSAGE_GET_MANY: {
'properties': {
'action': {'enum': [consts.MESSAGE_GET_MANY]},
'headers': {
'type': 'object',
'properties': headers,
'required': ['Client-ID', 'X-Project-ID']
},
'body': {
'type': 'object',
'properties': {
'queue_name': {'type': 'string'},
'message_ids': {'type': 'array'},
},
'required': ['queue_name', 'message_ids'],
}
},
'required': ['action', 'headers', 'body']
},
consts.MESSAGE_POST: {
'properties': {
'action': {'enum': [consts.MESSAGE_POST]},
'headers': {
'type': 'object',
'properties': headers,
'required': ['Client-ID', 'X-Project-ID']
},
'body': {
'type': 'object',
'properties': {
'queue_name': {'type': 'string'},
'messages': {'type': 'array'},
},
'required': ['queue_name', 'messages'],
}
},
'required': ['action', 'headers', 'body']
},
consts.MESSAGE_DELETE: {
'properties': {
'action': {'enum': [consts.MESSAGE_DELETE]},
'headers': {
'type': 'object',
'properties': headers,
'required': ['Client-ID', 'X-Project-ID']
},
'body': {
'type': 'object',
'properties': {
'queue_name': {'type': 'string'},
'message_id': {'type': 'string'},
'claim_id': {'type': 'string'}
},
'required': ['queue_name', 'message_id'],
}
},
'required': ['action', 'headers', 'body']
},
consts.MESSAGE_DELETE_MANY: {
'properties': {
'action': {'enum': [consts.MESSAGE_DELETE_MANY]},
'headers': {
'type': 'object',
'properties': headers,
'required': ['Client-ID', 'X-Project-ID']
},
'body': {
'type': 'object',
'properties': {
'queue_name': {'type': 'string'},
'message_ids': {'type': 'array'},
'claim_ids': {'type': 'array'},
'pop': {'type': 'integer'}
},
'required': ['queue_name'],
}
},
'required': ['action', 'headers', 'body']
},
# Claims
consts.CLAIM_CREATE: {
'properties': {
'action': {'enum': [consts.CLAIM_CREATE]},
'headers': {
'type': 'object',
'properties': headers,
'required': ['Client-ID', 'X-Project-ID']
},
'body': {
'type': 'object',
'properties': {
'queue_name': {'type': 'string'},
'limit': {'type': 'integer'},
'ttl': {'type': 'integer'},
'grace': {'type': 'integer'}
},
'required': ['queue_name'],
}
},
'required': ['action', 'headers', 'body']
},
consts.CLAIM_GET: {
'properties': {
'action': {'enum': [consts.CLAIM_GET]},
'headers': {
'type': 'object',
'properties': headers,
'required': ['Client-ID', 'X-Project-ID']
},
'body': {
'type': 'object',
'properties': {
'queue_name': {'type': 'string'},
'claim_id': {'type': 'string'}
},
'required': ['queue_name', 'claim_id'],
}
},
'required': ['action', 'headers', 'body']
},
consts.CLAIM_UPDATE: {
'properties': {
'action': {'enum': [consts.CLAIM_UPDATE]},
'headers': {
'type': 'object',
'properties': headers,
'required': ['Client-ID', 'X-Project-ID']
},
'body': {
'type': 'object',
'properties': {
'queue_name': {'type': 'string'},
'claim_id': {'type': 'string'},
'ttl': {'type': 'integer'}
},
'required': ['queue_name', 'claim_id'],
}
},
'required': ['action', 'headers', 'body']
},
consts.CLAIM_DELETE: {
'properties': {
'action': {'enum': [consts.CLAIM_DELETE]},
'headers': {
'type': 'object',
'properties': headers,
'required': ['Client-ID', 'X-Project-ID']
},
'body': {
'type': 'object',
'properties': {
'queue_name': {'type': 'string'},
'claim_id': {'type': 'string'}
},
'required': ['queue_name', 'claim_id'],
}
},
'required': ['action', 'headers', 'body']
},
# Pools
consts.POOL_LIST: {
@ -241,4 +603,4 @@ class RequestSchema(v1.RequestSchema):
'required': ['action', 'headers', 'body'],
'admin': True,
},
})
}

View File

@ -19,23 +19,6 @@ from oslo_serialization import jsonutils
from zaqar.tests.unit.transport.wsgi import base
EXPECTED_VERSIONS = [
{
'id': '1',
'status': 'DEPRECATED',
'updated': '2014-9-11T17:47:05Z',
'media-types': [
{
'base': 'application/json',
'type': 'application/vnd.openstack.messaging-v1+json'
}
],
'links': [
{
'href': '/v1/',
'rel': 'self'
}
]
},
{
'id': '1.1',
'status': 'DEPRECATED',
@ -82,5 +65,5 @@ class TestVersion(base.TestBase):
versions = jsonutils.loads(response[0])['versions']
self.assertEqual(falcon.HTTP_300, self.srmock.status)
self.assertEqual(3, len(versions))
self.assertEqual(2, len(versions))
self.assertEqual(EXPECTED_VERSIONS, versions)

View File

@ -1,43 +0,0 @@
# Copyright (c) 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test Auth."""
import falcon
from falcon import testing
from keystonemiddleware import auth_token
from oslo_utils import uuidutils
from zaqar.tests.unit.transport.wsgi import base
class TestAuth(base.V1Base):
config_file = 'keystone_auth.conf'
def setUp(self):
super(TestAuth, self).setUp()
self.headers = {'Client-ID': uuidutils.generate_uuid()}
def test_auth_install(self):
self.assertIsInstance(self.app._auth_app, auth_token.AuthProtocol)
def test_non_authenticated(self):
env = testing.create_environ(self.url_prefix + '/480924/queues/',
method='GET',
headers=self.headers)
self.app(env, self.srmock)
self.assertEqual(falcon.HTTP_401, self.srmock.status)

View File

@ -1,257 +0,0 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
from unittest import mock
import ddt
import falcon
from oslo_serialization import jsonutils
from oslo_utils import timeutils
from oslo_utils import uuidutils
from testtools import matchers
from zaqar import tests as testing
from zaqar.tests.unit.transport.wsgi import base
@ddt.ddt
class TestClaimsMongoDB(base.V1Base):
config_file = 'wsgi_mongodb.conf'
@testing.requires_mongodb
def setUp(self):
super(TestClaimsMongoDB, self).setUp()
self.project_id = '480924'
self.queue_path = self.url_prefix + '/queues/fizbit'
self.claims_path = self.queue_path + '/claims'
self.messages_path = self.queue_path + '/messages'
doc = '{"_ttl": 60}'
self.simulate_put(self.queue_path, self.project_id, body=doc)
self.assertEqual(falcon.HTTP_201, self.srmock.status)
doc = jsonutils.dumps([{'body': 239, 'ttl': 300}] * 10)
self.simulate_post(self.queue_path + '/messages', self.project_id,
body=doc, headers={'Client-ID':
uuidutils.generate_uuid()})
self.assertEqual(falcon.HTTP_201, self.srmock.status)
def tearDown(self):
storage = self.boot.storage._storage
control = self.boot.control
connection = storage.connection
connection.drop_database(control.queues_database)
for db in storage.message_databases:
connection.drop_database(db)
self.simulate_delete(self.queue_path, self.project_id)
super(TestClaimsMongoDB, self).tearDown()
@ddt.data(None, '[', '[]', '{}', '.', '"fail"')
def test_bad_claim(self, doc):
self.simulate_post(self.claims_path, self.project_id, body=doc)
self.assertEqual(falcon.HTTP_400, self.srmock.status)
href = self._get_a_claim()
self.simulate_patch(href, self.project_id, body=doc)
self.assertEqual(falcon.HTTP_400, self.srmock.status)
def test_exceeded_claim(self):
self.simulate_post(self.claims_path, self.project_id,
body='{"ttl": 100, "grace": 60}',
query_string='limit=21')
self.assertEqual(falcon.HTTP_400, self.srmock.status)
@ddt.data((-1, -1), (59, 60), (60, 59), (60, 43201), (43201, 60))
def test_unacceptable_ttl_or_grace(self, ttl_grace):
ttl, grace = ttl_grace
self.simulate_post(self.claims_path, self.project_id,
body=jsonutils.dumps({'ttl': ttl, 'grace': grace}))
self.assertEqual(falcon.HTTP_400, self.srmock.status)
@ddt.data(-1, 59, 43201)
def test_unacceptable_new_ttl(self, ttl):
href = self._get_a_claim()
self.simulate_patch(href, self.project_id,
body=jsonutils.dumps({'ttl': ttl}))
self.assertEqual(falcon.HTTP_400, self.srmock.status)
def _get_a_claim(self):
doc = '{"ttl": 100, "grace": 60}'
self.simulate_post(self.claims_path, self.project_id, body=doc)
return self.srmock.headers_dict['Location']
def test_lifecycle(self):
doc = '{"ttl": 100, "grace": 60}'
# First, claim some messages
body = self.simulate_post(self.claims_path, self.project_id, body=doc)
self.assertEqual(falcon.HTTP_201, self.srmock.status)
claimed = jsonutils.loads(body[0])
claim_href = self.srmock.headers_dict['Location']
message_href, params = claimed[0]['href'].split('?')
# No more messages to claim
self.simulate_post(self.claims_path, self.project_id, body=doc,
query_string='limit=3')
self.assertEqual(falcon.HTTP_204, self.srmock.status)
headers = {
'Client-ID': uuidutils.generate_uuid(),
}
# Listing messages, by default, won't include claimed
body = self.simulate_get(self.messages_path, self.project_id,
headers=headers)
self.assertEqual(falcon.HTTP_204, self.srmock.status)
# Include claimed messages this time
body = self.simulate_get(self.messages_path, self.project_id,
query_string='include_claimed=true',
headers=headers)
listed = jsonutils.loads(body[0])
self.assertEqual(falcon.HTTP_200, self.srmock.status)
self.assertEqual(len(claimed), len(listed['messages']))
now = timeutils.utcnow() + datetime.timedelta(seconds=10)
timeutils_utcnow = 'oslo_utils.timeutils.utcnow'
with mock.patch(timeutils_utcnow) as mock_utcnow:
mock_utcnow.return_value = now
body = self.simulate_get(claim_href, self.project_id)
claim = jsonutils.loads(body[0])
self.assertEqual(falcon.HTTP_200, self.srmock.status)
self.assertEqual(claim_href,
self.srmock.headers_dict['Content-Location'])
self.assertEqual(100, claim['ttl'])
# NOTE(cpp-cabrera): verify that claim age is non-negative
self.assertThat(claim['age'], matchers.GreaterThan(-1))
# Try to delete the message without submitting a claim_id
self.simulate_delete(message_href, self.project_id)
self.assertEqual(falcon.HTTP_403, self.srmock.status)
# Delete the message and its associated claim
self.simulate_delete(message_href, self.project_id,
query_string=params)
self.assertEqual(falcon.HTTP_204, self.srmock.status)
# Try to get it from the wrong project
self.simulate_get(message_href, 'bogus_project', query_string=params)
self.assertEqual(falcon.HTTP_404, self.srmock.status)
# Get the message
self.simulate_get(message_href, self.project_id, query_string=params)
self.assertEqual(falcon.HTTP_404, self.srmock.status)
# Update the claim
new_claim_ttl = '{"ttl": 60}'
creation = timeutils.utcnow()
self.simulate_patch(claim_href, self.project_id, body=new_claim_ttl)
self.assertEqual(falcon.HTTP_204, self.srmock.status)
# Get the claimed messages (again)
body = self.simulate_get(claim_href, self.project_id)
query = timeutils.utcnow()
claim = jsonutils.loads(body[0])
message_href, params = claim['messages'][0]['href'].split('?')
self.assertEqual(60, claim['ttl'])
estimated_age = timeutils.delta_seconds(creation, query)
self.assertGreater(estimated_age, claim['age'])
# Delete the claim
self.simulate_delete(claim['href'], 'bad_id')
self.assertEqual(falcon.HTTP_204, self.srmock.status)
self.simulate_delete(claim['href'], self.project_id)
self.assertEqual(falcon.HTTP_204, self.srmock.status)
# Try to delete a message with an invalid claim ID
self.simulate_delete(message_href, self.project_id,
query_string=params)
self.assertEqual(falcon.HTTP_400, self.srmock.status)
# Make sure it wasn't deleted!
self.simulate_get(message_href, self.project_id, query_string=params)
self.assertEqual(falcon.HTTP_200, self.srmock.status)
# Try to get a claim that doesn't exist
self.simulate_get(claim['href'])
self.assertEqual(falcon.HTTP_404, self.srmock.status)
# Try to update a claim that doesn't exist
self.simulate_patch(claim['href'], body=doc)
self.assertEqual(falcon.HTTP_404, self.srmock.status)
def test_post_claim_nonexistent_queue(self):
path = self.url_prefix + '/queues/nonexistent/claims'
self.simulate_post(path, self.project_id,
body='{"ttl": 100, "grace": 60}')
self.assertEqual(falcon.HTTP_204, self.srmock.status)
def test_get_claim_nonexistent_queue(self):
path = self.url_prefix + '/queues/nonexistent/claims/aaabbbba'
self.simulate_get(path)
self.assertEqual(falcon.HTTP_404, self.srmock.status)
# NOTE(cpp-cabrera): regression test against bug #1203842
def test_get_nonexistent_claim_404s(self):
self.simulate_get(self.claims_path + '/a')
self.assertEqual(falcon.HTTP_404, self.srmock.status)
def test_delete_nonexistent_claim_204s(self):
self.simulate_delete(self.claims_path + '/a')
self.assertEqual(falcon.HTTP_204, self.srmock.status)
def test_patch_nonexistent_claim_404s(self):
patch_data = jsonutils.dumps({'ttl': 100})
self.simulate_patch(self.claims_path + '/a', body=patch_data)
self.assertEqual(falcon.HTTP_404, self.srmock.status)
class TestClaimsFaultyDriver(base.V1BaseFaulty):
config_file = 'wsgi_faulty.conf'
def test_simple(self):
project_id = '480924'
claims_path = self.url_prefix + '/queues/fizbit/claims'
doc = '{"ttl": 100, "grace": 60}'
self.simulate_post(claims_path, project_id, body=doc)
self.assertEqual(falcon.HTTP_503, self.srmock.status)
self.simulate_get(claims_path + '/nichts', project_id)
self.assertEqual(falcon.HTTP_503, self.srmock.status)
self.simulate_patch(claims_path + '/nichts', project_id, body=doc)
self.assertEqual(falcon.HTTP_503, self.srmock.status)
self.simulate_delete(claims_path + '/foo', project_id)
self.assertEqual(falcon.HTTP_503, self.srmock.status)

View File

@ -1,99 +0,0 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import contextlib
import falcon
from oslo_serialization import jsonutils
from oslo_utils import uuidutils
from zaqar import storage
from zaqar.tests.unit.transport.wsgi import base
class TestDefaultLimits(base.V1Base):
config_file = 'wsgi_mongodb_default_limits.conf'
def setUp(self):
super(TestDefaultLimits, self).setUp()
self.queue_path = self.url_prefix + '/queues'
self.q1_queue_path = self.queue_path + '/' + uuidutils.generate_uuid()
self.messages_path = self.q1_queue_path + '/messages'
self.claims_path = self.q1_queue_path + '/claims'
self.simulate_put(self.q1_queue_path)
def tearDown(self):
self.simulate_delete(self.queue_path)
super(TestDefaultLimits, self).tearDown()
def test_queue_listing(self):
# 2 queues to list
self.addCleanup(self.simulate_delete, self.queue_path + '/q2')
self.simulate_put(self.queue_path + '/q2')
self.assertEqual(falcon.HTTP_201, self.srmock.status)
with self._prepare_queues(storage.DEFAULT_QUEUES_PER_PAGE + 1):
result = self.simulate_get(self.queue_path)
self.assertEqual(falcon.HTTP_200, self.srmock.status)
queues = jsonutils.loads(result[0])['queues']
self.assertEqual(storage.DEFAULT_QUEUES_PER_PAGE, len(queues))
def test_message_listing(self):
self._prepare_messages(storage.DEFAULT_MESSAGES_PER_PAGE + 1)
result = self.simulate_get(self.messages_path,
headers={'Client-ID':
uuidutils.generate_uuid()})
self.assertEqual(falcon.HTTP_200, self.srmock.status)
messages = jsonutils.loads(result[0])['messages']
self.assertEqual(storage.DEFAULT_MESSAGES_PER_PAGE, len(messages))
def test_claim_creation(self):
self._prepare_messages(storage.DEFAULT_MESSAGES_PER_CLAIM + 1)
result = self.simulate_post(self.claims_path,
body='{"ttl": 60, "grace": 60}')
self.assertEqual(falcon.HTTP_201, self.srmock.status)
messages = jsonutils.loads(result[0])
self.assertEqual(storage.DEFAULT_MESSAGES_PER_CLAIM, len(messages))
@contextlib.contextmanager
def _prepare_queues(self, count):
queue_paths = [self.queue_path + '/multi-{0}'.format(i)
for i in range(count)]
for path in queue_paths:
self.simulate_put(path)
self.assertEqual(falcon.HTTP_201, self.srmock.status)
yield
for path in queue_paths:
self.simulate_delete(path)
def _prepare_messages(self, count):
doc = jsonutils.dumps([{'body': 239, 'ttl': 300}] * count)
self.simulate_post(self.messages_path, body=doc,
headers={'Client-ID': uuidutils.generate_uuid()})
self.assertEqual(falcon.HTTP_201, self.srmock.status)

View File

@ -1,33 +0,0 @@
# Copyright (c) 2015 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import falcon
from zaqar.tests.unit.transport.wsgi import base
class TestHealth(base.V1Base):
config_file = 'wsgi_mongodb.conf'
def test_get(self):
response = self.simulate_get('/v1/health')
self.assertEqual(falcon.HTTP_204, self.srmock.status)
self.assertEqual([], response)
def test_head(self):
response = self.simulate_head('/v1/health')
self.assertEqual(falcon.HTTP_204, self.srmock.status)
self.assertEqual([], response)

View File

@ -1,57 +0,0 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
import falcon
from oslo_serialization import jsonutils
from urllib import parse as urlparse
from zaqar.tests.unit.transport.wsgi import base
class TestHomeDocument(base.V1Base):
config_file = 'wsgi_mongodb.conf'
def test_json_response(self):
body = self.simulate_get(self.url_prefix + '/')
self.assertEqual(falcon.HTTP_200, self.srmock.status)
content_type = self.srmock.headers_dict['Content-Type']
self.assertEqual('application/json-home', content_type)
try:
jsonutils.loads(body[0])
except ValueError:
self.fail('Home document is not valid JSON')
def test_href_template(self):
body = self.simulate_get(self.url_prefix + '/')
self.assertEqual(falcon.HTTP_200, self.srmock.status)
resp = jsonutils.loads(body[0])
queue_href_template = resp['resources']['rel/queue']['href-template']
path_1 = 'https://zaqar.example.com' + self.url_prefix
path_2 = 'https://zaqar.example.com' + self.url_prefix + '/'
# Verify all the href template start with the correct version prefix
for resource in list(resp['resources']):
self.assertTrue(resp['resources'][resource]['href-template'].
startswith(self.url_prefix))
url = urlparse.urljoin(path_1, queue_href_template)
expected = ('https://zaqar.example.com' + self.url_prefix +
'/queues/foo')
self.assertEqual(expected, url.format(queue_name='foo'))
url = urlparse.urljoin(path_2, queue_href_template)
self.assertEqual(expected, url.format(queue_name='foo'))

View File

@ -1,82 +0,0 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import falcon
from falcon import testing
from oslo_serialization import jsonutils
from oslo_utils import uuidutils
from zaqar.tests.unit.transport.wsgi import base
class TestMediaType(base.V1Base):
config_file = 'wsgi_mongodb.conf'
def test_json_only_endpoints_with_wrong_accept_header(self):
endpoints = (
('GET', self.url_prefix + '/queues'),
('GET', self.url_prefix + '/queues/nonexistent/metadata'),
('GET', self.url_prefix + '/queues/nonexistent/stats'),
('POST', self.url_prefix + '/queues/nonexistent/messages'),
('GET', self.url_prefix + '/queues/nonexistent/messages/deadbeaf'),
('POST', self.url_prefix + '/queues/nonexistent/claims'),
('GET', self.url_prefix + '/queues/nonexistent/claims/0ad'),
('GET', self.url_prefix + '/health'),
)
for method, endpoint in endpoints:
headers = {
'Client-ID': uuidutils.generate_uuid(),
'Accept': 'application/xml',
}
env = testing.create_environ(endpoint,
method=method,
headers=headers)
self.app(env, self.srmock)
self.assertEqual(falcon.HTTP_406, self.srmock.status)
def test_request_with_body_and_urlencoded_contenttype_header_fails(self):
# NOTE(Eva-i): this test case makes sure wsgi 'before' hook
# "require_content_type_be_non_urlencoded" works to prevent
# bug/1547100.
eww_queue_path = self.url_prefix + '/queues/eww'
eww_queue_messages_path = eww_queue_path + '/messages'
sample_message = jsonutils.dumps([{'body': {'eww!'}, 'ttl': 200}])
bad_headers = {
'Client-ID': uuidutils.generate_uuid(),
'Content-Type': 'application/x-www-form-urlencoded',
}
# Create queue request with bad headers. Should still work, because it
# has no body.
self.simulate_put(eww_queue_path, headers=bad_headers)
self.addCleanup(self.simulate_delete, eww_queue_path,
headers=self.headers)
self.assertEqual(falcon.HTTP_201, self.srmock.status)
# Post message request with good headers. Should work.
self.simulate_post(eww_queue_messages_path, body=sample_message,
headers=self.headers)
self.assertEqual(falcon.HTTP_201, self.srmock.status)
# Post message request with bad headers. Should not work.
self.simulate_post(eww_queue_messages_path, body=sample_message,
headers=bad_headers)
self.assertEqual(falcon.HTTP_400, self.srmock.status)

View File

@ -1,505 +0,0 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
from unittest import mock
import ddt
import falcon
from oslo_serialization import jsonutils
from oslo_utils import timeutils
from oslo_utils import uuidutils
from testtools import matchers
from zaqar import tests as testing
from zaqar.tests.unit.transport.wsgi import base
from zaqar.transport import validation
@ddt.ddt
class TestMessagesMongoDB(base.V1Base):
config_file = 'wsgi_mongodb.conf'
@testing.requires_mongodb
def setUp(self):
super(TestMessagesMongoDB, self).setUp()
if self.conf.pooling:
for i in range(4):
uri = "%s/%s" % (self.mongodb_url, str(i))
doc = {'weight': 100, 'uri': uri}
self.simulate_put(self.url_prefix + '/pools/' + str(i),
body=jsonutils.dumps(doc))
self.assertEqual(falcon.HTTP_201, self.srmock.status)
self.project_id = '7e55e1a7e'
# TODO(kgriffs): Add support in self.simulate_* for a "base path"
# so that we don't have to concatenate against self.url_prefix
# all over the place.
self.queue_path = self.url_prefix + '/queues/fizbit'
self.messages_path = self.queue_path + '/messages'
doc = '{"_ttl": 60}'
self.simulate_put(self.queue_path, self.project_id, body=doc)
# NOTE(kgriffs): Also register without a project for tests
# that do not specify a project.
#
# TODO(kgriffs): Should a project id always be required or
# automatically supplied in the simulate_* methods?
self.simulate_put(self.queue_path, body=doc)
self.headers = {
'Client-ID': uuidutils.generate_uuid(),
}
def tearDown(self):
self.simulate_delete(self.queue_path, self.project_id)
if self.conf.pooling:
for i in range(4):
self.simulate_delete(self.url_prefix + '/pools/' + str(i))
super(TestMessagesMongoDB, self).tearDown()
def _test_post(self, sample_messages):
sample_doc = jsonutils.dumps(sample_messages)
result = self.simulate_post(self.messages_path, self.project_id,
body=sample_doc, headers=self.headers)
self.assertEqual(falcon.HTTP_201, self.srmock.status)
result_doc = jsonutils.loads(result[0])
msg_ids = self._get_msg_ids(self.srmock.headers_dict)
self.assertEqual(len(sample_messages), len(msg_ids))
expected_resources = [str(self.messages_path + '/' + id)
for id in msg_ids]
self.assertEqual(expected_resources, result_doc['resources'])
# NOTE(kgriffs): As of the Icehouse release, drivers are
# required to either completely succeed, or completely fail
# to enqueue the entire batch of messages.
self.assertFalse(result_doc['partial'])
self.assertEqual(len(sample_messages), len(msg_ids))
lookup = dict([(m['ttl'], m['body']) for m in sample_messages])
# Test GET on the message resource directly
# NOTE(cpp-cabrera): force the passing of time to age a message
timeutils_utcnow = 'oslo_utils.timeutils.utcnow'
now = timeutils.utcnow() + datetime.timedelta(seconds=10)
with mock.patch(timeutils_utcnow) as mock_utcnow:
mock_utcnow.return_value = now
for msg_id in msg_ids:
message_uri = self.messages_path + '/' + msg_id
# Wrong project ID
self.simulate_get(message_uri, '777777')
self.assertEqual(falcon.HTTP_404, self.srmock.status)
# Correct project ID
result = self.simulate_get(message_uri, self.project_id)
self.assertEqual(falcon.HTTP_200, self.srmock.status)
self.assertEqual(message_uri,
self.srmock.headers_dict['Content-Location'])
# Check message properties
message = jsonutils.loads(result[0])
self.assertEqual(message_uri, message['href'])
self.assertEqual(lookup[message['ttl']], message['body'])
# no negative age
# NOTE(cpp-cabrera): testtools lacks GreaterThanEqual on py26
self.assertThat(message['age'],
matchers.GreaterThan(-1))
# Test bulk GET
query_string = 'ids=' + ','.join(msg_ids)
result = self.simulate_get(self.messages_path, self.project_id,
query_string=query_string)
self.assertEqual(falcon.HTTP_200, self.srmock.status)
result_doc = jsonutils.loads(result[0])
expected_ttls = set(m['ttl'] for m in sample_messages)
actual_ttls = set(m['ttl'] for m in result_doc)
self.assertFalse(expected_ttls - actual_ttls)
def test_exceeded_payloads(self):
# Get a valid message id
self._post_messages(self.messages_path)
msg_id = self._get_msg_id(self.srmock.headers_dict)
# Bulk GET restriction
query_string = 'ids=' + ','.join([msg_id] * 21)
self.simulate_get(self.messages_path, self.project_id,
query_string=query_string)
self.assertEqual(falcon.HTTP_400, self.srmock.status)
# Listing restriction
self.simulate_get(self.messages_path, self.project_id,
query_string='limit=21',
headers=self.headers)
self.assertEqual(falcon.HTTP_400, self.srmock.status)
# Bulk deletion restriction
query_string = 'ids=' + ','.join([msg_id] * 22)
self.simulate_delete(self.messages_path, self.project_id,
query_string=query_string)
self.assertEqual(falcon.HTTP_400, self.srmock.status)
def test_post_single(self):
sample_messages = [
{'body': {'key': 'value'}, 'ttl': 200},
]
self._test_post(sample_messages)
def test_post_multiple(self):
sample_messages = [
{'body': 239, 'ttl': 100},
{'body': {'key': 'value'}, 'ttl': 200},
{'body': [1, 3], 'ttl': 300},
]
self._test_post(sample_messages)
def test_post_to_non_ascii_queue(self):
# NOTE(kgriffs): This test verifies that routes with
# embedded queue name params go through the validation
# hook, regardless of the target resource.
path = self.url_prefix + '/queues/non-ascii-n\u0153me/messages'
self._post_messages(path)
self.assertEqual(falcon.HTTP_400, self.srmock.status)
def test_post_with_long_queue_name(self):
# NOTE(kgriffs): This test verifies that routes with
# embedded queue name params go through the validation
# hook, regardless of the target resource.
queues_path = self.url_prefix + '/queues/'
game_title = 'v' * validation.QUEUE_NAME_MAX_LEN
self._post_messages(queues_path + game_title + '/messages')
self.assertEqual(falcon.HTTP_404, self.srmock.status)
game_title += 'v'
self._post_messages(queues_path + game_title + '/messages')
self.assertEqual(falcon.HTTP_400, self.srmock.status)
def test_post_to_missing_queue(self):
self._post_messages(self.url_prefix + '/queues/nonexistent/messages')
self.assertEqual(falcon.HTTP_404, self.srmock.status)
def test_get_from_missing_queue(self):
self.simulate_get(self.url_prefix + '/queues/nonexistent/messages',
self.project_id,
headers={'Client-ID':
'dfcd3238-425c-11e3-8a80-28cfe91478b9'})
self.assertEqual(falcon.HTTP_204, self.srmock.status)
@ddt.data('', '0xdeadbeef', '550893e0-2b6e-11e3-835a-5cf9dd72369')
def test_bad_client_id(self, text_id):
self.simulate_post(self.queue_path + '/messages',
body='{"ttl": 60, "body": ""}',
headers={'Client-ID': text_id})
self.assertEqual(falcon.HTTP_400, self.srmock.status)
self.simulate_get(self.queue_path + '/messages',
query_string='limit=3&echo=true',
headers={'Client-ID': text_id})
self.assertEqual(falcon.HTTP_400, self.srmock.status)
@ddt.data(None, '[', '[]', '{}', '.')
def test_post_bad_message(self, document):
self.simulate_post(self.queue_path + '/messages',
body=document,
headers=self.headers)
self.assertEqual(falcon.HTTP_400, self.srmock.status)
@ddt.data(-1, 59, 1209601)
def test_unacceptable_ttl(self, ttl):
self.simulate_post(self.queue_path + '/messages',
body=jsonutils.dumps([{'ttl': ttl, 'body': None}]),
headers=self.headers)
self.assertEqual(falcon.HTTP_400, self.srmock.status)
def test_exceeded_message_posting(self):
# Total (raw request) size
doc = jsonutils.dumps([{'body': "some body", 'ttl': 100}] * 20,
indent=4)
max_len = self.transport_cfg.max_messages_post_size
long_doc = doc + (' ' * (max_len - len(doc) + 1))
self.simulate_post(self.queue_path + '/messages',
body=long_doc,
headers=self.headers)
self.assertEqual(falcon.HTTP_400, self.srmock.status)
@ddt.data('{"overflow": 9223372036854775808}',
'{"underflow": -9223372036854775809}')
def test_unsupported_json(self, document):
self.simulate_post(self.queue_path + '/messages',
body=document,
headers=self.headers)
self.assertEqual(falcon.HTTP_400, self.srmock.status)
def test_delete(self):
self._post_messages(self.messages_path)
msg_id = self._get_msg_id(self.srmock.headers_dict)
target = self.messages_path + '/' + msg_id
self.simulate_get(target, self.project_id)
self.assertEqual(falcon.HTTP_200, self.srmock.status)
self.simulate_delete(target, self.project_id)
self.assertEqual(falcon.HTTP_204, self.srmock.status)
self.simulate_get(target, self.project_id)
self.assertEqual(falcon.HTTP_404, self.srmock.status)
# Safe to delete non-existing ones
self.simulate_delete(target, self.project_id)
self.assertEqual(falcon.HTTP_204, self.srmock.status)
def test_bulk_delete(self):
path = self.queue_path + '/messages'
self._post_messages(path, repeat=5)
[target, params] = self.srmock.headers_dict['location'].split('?')
# Deleting the whole collection is denied
self.simulate_delete(path, self.project_id)
self.assertEqual(falcon.HTTP_400, self.srmock.status)
self.simulate_delete(target, self.project_id, query_string=params)
self.assertEqual(falcon.HTTP_204, self.srmock.status)
self.simulate_get(target, self.project_id, query_string=params)
self.assertEqual(falcon.HTTP_204, self.srmock.status)
# Safe to delete non-existing ones
self.simulate_delete(target, self.project_id, query_string=params)
self.assertEqual(falcon.HTTP_204, self.srmock.status)
# Even after the queue is gone
self.simulate_delete(self.queue_path, self.project_id)
self.assertEqual(falcon.HTTP_204, self.srmock.status)
self.simulate_delete(target, self.project_id, query_string=params)
self.assertEqual(falcon.HTTP_204, self.srmock.status)
def test_list(self):
path = self.queue_path + '/messages'
self._post_messages(path, repeat=10)
query_string = 'limit=3&echo=true'
body = self.simulate_get(path, self.project_id,
query_string=query_string,
headers=self.headers)
self.assertEqual(falcon.HTTP_200, self.srmock.status)
self.assertEqual(path + '?' + query_string,
self.srmock.headers_dict['Content-Location'])
cnt = 0
while self.srmock.status == falcon.HTTP_200:
contents = jsonutils.loads(body[0])
[target, params] = contents['links'][0]['href'].split('?')
for msg in contents['messages']:
self.simulate_get(msg['href'], self.project_id)
self.assertEqual(falcon.HTTP_200, self.srmock.status)
body = self.simulate_get(target, self.project_id,
query_string=params,
headers=self.headers)
cnt += 1
self.assertEqual(4, cnt)
self.assertEqual(falcon.HTTP_204, self.srmock.status)
# Stats
body = self.simulate_get(self.queue_path + '/stats', self.project_id)
self.assertEqual(falcon.HTTP_200, self.srmock.status)
message_stats = jsonutils.loads(body[0])['messages']
self.assertEqual(self.queue_path + '/stats',
self.srmock.headers_dict['Content-Location'])
# NOTE(kgriffs): The other parts of the stats are tested
# in tests.storage.base and so are not repeated here.
expected_pattern = self.queue_path + '/messages/[^/]+$'
for message_stat_name in ('oldest', 'newest'):
self.assertThat(message_stats[message_stat_name]['href'],
matchers.MatchesRegex(expected_pattern))
# NOTE(kgriffs): Try to get messages for a missing queue
self.simulate_get(self.url_prefix + '/queues/nonexistent/messages',
self.project_id,
headers=self.headers)
self.assertEqual(falcon.HTTP_204, self.srmock.status)
def test_list_with_bad_marker(self):
path = self.queue_path + '/messages'
self._post_messages(path, repeat=5)
query_string = 'limit=3&echo=true&marker=sfhlsfdjh2048'
self.simulate_get(path, self.project_id,
query_string=query_string,
headers=self.headers)
self.assertEqual(falcon.HTTP_204, self.srmock.status)
def test_no_uuid(self):
path = self.queue_path + '/messages'
self.simulate_post(path, '7e7e7e',
headers={},
body='[{"body": 0, "ttl": 100}]')
self.assertEqual(falcon.HTTP_400, self.srmock.status)
self.simulate_get(path, '7e7e7e', headers={})
self.assertEqual(falcon.HTTP_400, self.srmock.status)
# NOTE(cpp-cabrera): regression test against bug #1210633
def test_when_claim_deleted_then_messages_unclaimed(self):
path = self.queue_path
self._post_messages(path + '/messages', repeat=5)
self.assertEqual(falcon.HTTP_201, self.srmock.status)
# post claim
self.simulate_post(path + '/claims', self.project_id,
body='{"ttl": 100, "grace": 100}')
self.assertEqual(falcon.HTTP_201, self.srmock.status)
location = self.srmock.headers_dict['location']
# release claim
self.simulate_delete(location, self.project_id)
self.assertEqual(falcon.HTTP_204, self.srmock.status)
# get unclaimed messages
self.simulate_get(path + '/messages', self.project_id,
query_string='echo=true',
headers=self.headers)
self.assertEqual(falcon.HTTP_200, self.srmock.status)
# NOTE(cpp-cabrera): regression test against bug #1203842
def test_get_nonexistent_message_404s(self):
path = self.url_prefix + '/queues/notthere/messages/a'
self.simulate_get(path)
self.assertEqual(falcon.HTTP_404, self.srmock.status)
def test_get_multiple_invalid_messages_204s(self):
path = self.url_prefix + '/queues/notthere/messages'
self.simulate_get(path, query_string='ids=a,b,c')
self.assertEqual(falcon.HTTP_204, self.srmock.status)
def test_delete_multiple_invalid_messages_204s(self):
path = self.url_prefix + '/queues/notthere/messages'
self.simulate_delete(path, query_string='ids=a,b,c')
self.assertEqual(falcon.HTTP_204, self.srmock.status)
def test_delete_message_with_invalid_claim_doesnt_delete_message(self):
path = self.queue_path
resp = self._post_messages(path + '/messages', 1)
location = jsonutils.loads(resp[0])['resources'][0]
self.simulate_delete(location, self.project_id,
query_string='claim_id=invalid')
self.assertEqual(falcon.HTTP_400, self.srmock.status)
self.simulate_get(location, self.project_id)
self.assertEqual(falcon.HTTP_200, self.srmock.status)
def test_no_duplicated_messages_path_in_href(self):
"""Test for bug 1240897."""
path = self.queue_path + '/messages'
self._post_messages(path, repeat=1)
msg_id = self._get_msg_id(self.srmock.headers_dict)
query_string = 'ids=%s' % msg_id
body = self.simulate_get(path, self.project_id,
query_string=query_string,
headers=self.headers)
messages = jsonutils.loads(body[0])
self.assertNotIn(self.queue_path + '/messages/messages',
messages[0]['href'])
def _post_messages(self, target, repeat=1):
doc = jsonutils.dumps([{'body': 239, 'ttl': 300}] * repeat)
return self.simulate_post(target, self.project_id, body=doc,
headers=self.headers)
def _get_msg_id(self, headers):
return self._get_msg_ids(headers)[0]
def _get_msg_ids(self, headers):
return headers['location'].rsplit('=', 1)[-1].split(',')
class TestMessagesMongoDBPooled(TestMessagesMongoDB):
config_file = 'wsgi_mongodb_pooled.conf'
# TODO(cpp-cabrera): remove this skipTest once pooled queue
# listing is implemented
def test_list(self):
self.skipTest("Need to implement pooled queue listing.")
class TestMessagesFaultyDriver(base.V1BaseFaulty):
config_file = 'wsgi_faulty.conf'
def test_simple(self):
project_id = 'xyz'
path = self.url_prefix + '/queues/fizbit/messages'
doc = '[{"body": 239, "ttl": 100}]'
headers = {
'Client-ID': uuidutils.generate_uuid(),
}
self.simulate_post(path, project_id,
body=doc,
headers=headers)
self.assertEqual(falcon.HTTP_503, self.srmock.status)
self.simulate_get(path, project_id,
headers=headers)
self.assertEqual(falcon.HTTP_503, self.srmock.status)
self.simulate_get(path + '/nonexistent', project_id)
self.assertEqual(falcon.HTTP_503, self.srmock.status)
self.simulate_delete(path + '/nada', project_id)
self.assertEqual(falcon.HTTP_503, self.srmock.status)

View File

@ -1,335 +0,0 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
import contextlib
import ddt
import falcon
from oslo_serialization import jsonutils
from oslo_utils import uuidutils
from zaqar import tests as testing
from zaqar.tests.unit.transport.wsgi import base
@contextlib.contextmanager
def pool(test, name, weight, uri, options={}):
"""A context manager for constructing a pool for use in testing.
Deletes the pool after exiting the context.
:param test: Must expose simulate_* methods
:param name: Name for this pool
:type name: str
:type weight: int
:type uri: str
:type options: dict
:returns: (name, weight, uri, options)
:rtype: see above
"""
uri = "%s/%s" % (uri, uuidutils.generate_uuid())
doc = {'weight': weight, 'uri': uri, 'options': options}
path = test.url_prefix + '/pools/' + name
test.simulate_put(path, body=jsonutils.dumps(doc))
try:
yield name, weight, uri, options
finally:
test.simulate_delete(path)
@contextlib.contextmanager
def pools(test, count, uri):
"""A context manager for constructing pools for use in testing.
Deletes the pools after exiting the context.
:param test: Must expose simulate_* methods
:param count: Number of pools to create
:type count: int
:returns: (paths, weights, uris, options)
:rtype: ([str], [int], [str], [dict])
"""
mongo_url = uri
base = test.url_prefix + '/pools/'
args = [(base + str(i), i,
{str(i): i})
for i in range(count)]
for path, weight, option in args:
uri = "%s/%s" % (mongo_url, uuidutils.generate_uuid())
doc = {'weight': weight, 'uri': uri, 'options': option}
test.simulate_put(path, body=jsonutils.dumps(doc))
try:
yield args
finally:
for path, _, _ in args:
test.simulate_delete(path)
@ddt.ddt
class TestPoolsMongoDB(base.V1Base):
config_file = 'wsgi_mongodb_pooled.conf'
@testing.requires_mongodb
def setUp(self):
super(TestPoolsMongoDB, self).setUp()
self.doc = {'weight': 100, 'uri': self.mongodb_url}
self.pool = self.url_prefix + '/pools/' + uuidutils.generate_uuid()
self.simulate_put(self.pool, body=jsonutils.dumps(self.doc))
self.assertEqual(falcon.HTTP_201, self.srmock.status)
def tearDown(self):
super(TestPoolsMongoDB, self).tearDown()
self.simulate_delete(self.pool)
self.assertEqual(falcon.HTTP_204, self.srmock.status)
def test_put_pool_works(self):
name = uuidutils.generate_uuid()
weight, uri = self.doc['weight'], self.doc['uri']
with pool(self, name, weight, uri):
self.assertEqual(falcon.HTTP_201, self.srmock.status)
def test_put_raises_if_missing_fields(self):
path = self.url_prefix + '/pools/' + uuidutils.generate_uuid()
self.simulate_put(path, body=jsonutils.dumps({'weight': 100}))
self.assertEqual(falcon.HTTP_400, self.srmock.status)
self.simulate_put(path,
body=jsonutils.dumps(
{'uri': self.mongodb_url}))
self.assertEqual(falcon.HTTP_400, self.srmock.status)
@ddt.data(-1, 2**32+1, 'big')
def test_put_raises_if_invalid_weight(self, weight):
path = self.url_prefix + '/pools/' + uuidutils.generate_uuid()
doc = {'weight': weight, 'uri': 'a'}
self.simulate_put(path,
body=jsonutils.dumps(doc))
self.assertEqual(falcon.HTTP_400, self.srmock.status)
@ddt.data(-1, 2**32+1, [], 'localhost:27017')
def test_put_raises_if_invalid_uri(self, uri):
path = self.url_prefix + '/pools/' + uuidutils.generate_uuid()
self.simulate_put(path,
body=jsonutils.dumps({'weight': 1, 'uri': uri}))
self.assertEqual(falcon.HTTP_400, self.srmock.status)
@ddt.data(-1, 'wee', [])
def test_put_raises_if_invalid_options(self, options):
path = self.url_prefix + '/pools/' + uuidutils.generate_uuid()
doc = {'weight': 1, 'uri': 'a', 'options': options}
self.simulate_put(path, body=jsonutils.dumps(doc))
self.assertEqual(falcon.HTTP_400, self.srmock.status)
def test_put_existing_overwrites(self):
# NOTE(cabrera): setUp creates default pool
expect = self.doc
self.simulate_put(self.pool,
body=jsonutils.dumps(expect))
self.assertEqual(falcon.HTTP_201, self.srmock.status)
result = self.simulate_get(self.pool)
self.assertEqual(falcon.HTTP_200, self.srmock.status)
doc = jsonutils.loads(result[0])
self.assertEqual(expect['weight'], doc['weight'])
self.assertEqual(expect['uri'], doc['uri'])
def test_delete_works(self):
self.simulate_delete(self.pool)
self.assertEqual(falcon.HTTP_204, self.srmock.status)
self.simulate_get(self.pool)
self.assertEqual(falcon.HTTP_404, self.srmock.status)
def test_get_nonexisting_raises_404(self):
self.simulate_get(self.url_prefix + '/pools/nonexisting')
self.assertEqual(falcon.HTTP_404, self.srmock.status)
def _pool_expect(self, pool, xhref, xweight, xuri):
self.assertIn('href', pool)
self.assertIn('name', pool)
self.assertEqual(xhref, pool['href'])
self.assertIn('weight', pool)
self.assertEqual(xweight, pool['weight'])
self.assertIn('uri', pool)
# NOTE(dynarro): we are using startwith because we are adding to
# pools UUIDs, to avoid dupplications
self.assertTrue(pool['uri'].startswith(xuri))
def test_get_works(self):
result = self.simulate_get(self.pool)
self.assertEqual(falcon.HTTP_200, self.srmock.status)
pool = jsonutils.loads(result[0])
self._pool_expect(pool, self.pool, self.doc['weight'],
self.doc['uri'])
def test_detailed_get_works(self):
result = self.simulate_get(self.pool,
query_string='detailed=True')
self.assertEqual(falcon.HTTP_200, self.srmock.status)
pool = jsonutils.loads(result[0])
self._pool_expect(pool, self.pool, self.doc['weight'],
self.doc['uri'])
self.assertIn('options', pool)
self.assertEqual({}, pool['options'])
def test_patch_raises_if_missing_fields(self):
self.simulate_patch(self.pool,
body=jsonutils.dumps({'location': 1}))
self.assertEqual(falcon.HTTP_400, self.srmock.status)
def _patch_test(self, doc):
self.simulate_patch(self.pool,
body=jsonutils.dumps(doc))
self.assertEqual(falcon.HTTP_200, self.srmock.status)
result = self.simulate_get(self.pool,
query_string='detailed=True')
self.assertEqual(falcon.HTTP_200, self.srmock.status)
pool = jsonutils.loads(result[0])
self._pool_expect(pool, self.pool, doc['weight'],
doc['uri'])
self.assertEqual(doc['options'], pool['options'])
def test_patch_works(self):
doc = {'weight': 101,
'uri': self.mongodb_url,
'options': {'a': 1}}
self._patch_test(doc)
def test_patch_works_with_extra_fields(self):
doc = {'weight': 101,
'uri': self.mongodb_url,
'options': {'a': 1},
'location': 100, 'partition': 'taco'}
self._patch_test(doc)
@ddt.data(-1, 2**32+1, 'big')
def test_patch_raises_400_on_invalid_weight(self, weight):
self.simulate_patch(self.pool,
body=jsonutils.dumps({'weight': weight}))
self.assertEqual(falcon.HTTP_400, self.srmock.status)
@ddt.data(-1, 2**32+1, [], 'localhost:27017')
def test_patch_raises_400_on_invalid_uri(self, uri):
self.simulate_patch(self.pool,
body=jsonutils.dumps({'uri': uri}))
self.assertEqual(falcon.HTTP_400, self.srmock.status)
@ddt.data(-1, 'wee', [])
def test_patch_raises_400_on_invalid_options(self, options):
self.simulate_patch(self.pool,
body=jsonutils.dumps({'options': options}))
self.assertEqual(falcon.HTTP_400, self.srmock.status)
def test_patch_raises_404_if_pool_not_found(self):
self.simulate_patch(self.url_prefix + '/pools/notexists',
body=jsonutils.dumps({'weight': 1}))
self.assertEqual(falcon.HTTP_404, self.srmock.status)
def test_empty_listing(self):
self.simulate_delete(self.pool)
result = self.simulate_get(self.url_prefix + '/pools')
results = jsonutils.loads(result[0])
self.assertEqual(falcon.HTTP_200, self.srmock.status)
self.assertEqual(0, len(results['pools']))
self.assertIn('links', results)
def _listing_test(self, count=10, limit=10,
marker=None, detailed=False):
# NOTE(cpp-cabrera): delete initial pool - it will interfere
# with listing tests
self.simulate_delete(self.pool)
query = 'limit={0}&detailed={1}'.format(limit, detailed)
if marker:
query += '&marker={0}'.format(marker)
with pools(self, count, self.doc['uri']) as expected:
result = self.simulate_get(self.url_prefix + '/pools',
query_string=query)
self.assertEqual(falcon.HTTP_200, self.srmock.status)
results = jsonutils.loads(result[0])
self.assertIsInstance(results, dict)
self.assertIn('pools', results)
self.assertIn('links', results)
pool_list = results['pools']
link = results['links'][0]
self.assertEqual('next', link['rel'])
href = falcon.uri.parse_query_string(link['href'].split('?')[1])
self.assertIn('marker', href)
self.assertEqual(str(limit), href['limit'])
self.assertEqual(str(detailed).lower(), href['detailed'])
next_query_string = ('marker={marker}&limit={limit}'
'&detailed={detailed}').format(**href)
next_result = self.simulate_get(link['href'].split('?')[0],
query_string=next_query_string)
self.assertEqual(falcon.HTTP_200, self.srmock.status)
next_pool = jsonutils.loads(next_result[0])
next_pool_list = next_pool['pools']
self.assertIn('links', next_pool)
if limit < count:
self.assertEqual(min(limit, count-limit),
len(next_pool_list))
else:
# NOTE(jeffrey4l): when limit >= count, there will be no
# pools in the 2nd page.
self.assertEqual(0, len(next_pool_list))
self.assertEqual(min(limit, count), len(pool_list))
for s in pool_list + next_pool_list:
# NOTE(flwang): It can't assumed that both sqlalchemy and
# mongodb can return query result with the same order. Just
# like the order they're inserted. Actually, sqlalchemy can't
# guarantee that. So we're leveraging the relationship between
# pool weight and the index of pools fixture to get the
# right pool to verify.
expect = expected[s['weight']]
path, weight = expect[:2]
self._pool_expect(s, path, weight, self.doc['uri'])
if detailed:
self.assertIn('options', s)
self.assertEqual(expect[-1], s['options'])
else:
self.assertNotIn('options', s)
def test_listing_works(self):
self._listing_test()
def test_detailed_listing_works(self):
self._listing_test(detailed=True)
@ddt.data(1, 5, 10, 15)
def test_listing_works_with_limit(self, limit):
self._listing_test(count=15, limit=limit)
def test_listing_marker_is_respected(self):
self.simulate_delete(self.pool)
with pools(self, 10, self.doc['uri']) as expected:
result = self.simulate_get(self.url_prefix + '/pools',
query_string='marker=3')
self.assertEqual(falcon.HTTP_200, self.srmock.status)
pool_list = jsonutils.loads(result[0])['pools']
self.assertEqual(6, len(pool_list))
path, weight = expected[4][:2]
self._pool_expect(pool_list[0], path, weight, self.doc['uri'])

View File

@ -1,398 +0,0 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
from unittest import mock
import ddt
import falcon
from oslo_serialization import jsonutils
from zaqar.storage import errors as storage_errors
from zaqar import tests as testing
from zaqar.tests.unit.transport.wsgi import base
@ddt.ddt
class TestQueueLifecycleMongoDB(base.V1Base):
config_file = 'wsgi_mongodb.conf'
@testing.requires_mongodb
def setUp(self):
super(TestQueueLifecycleMongoDB, self).setUp()
self.queue_path = self.url_prefix + '/queues'
self.gumshoe_queue_path = self.queue_path + '/gumshoe'
self.fizbat_queue_path = self.queue_path + '/fizbat'
self.fizbat_queue_path_metadata = self.fizbat_queue_path + '/metadata'
def tearDown(self):
storage = self.boot.storage._storage
connection = storage.connection
connection.drop_database(self.boot.control.queues_database)
for db in storage.message_databases:
connection.drop_database(db)
super(TestQueueLifecycleMongoDB, self).tearDown()
def test_empty_project_id(self):
self.simulate_get(self.gumshoe_queue_path, '')
self.assertEqual(falcon.HTTP_400, self.srmock.status)
self.simulate_put(self.gumshoe_queue_path, '')
self.assertEqual(falcon.HTTP_400, self.srmock.status)
self.simulate_head(self.gumshoe_queue_path, '')
self.assertEqual(falcon.HTTP_400, self.srmock.status)
self.simulate_delete(self.gumshoe_queue_path, '')
self.assertEqual(falcon.HTTP_400, self.srmock.status)
@ddt.data('480924', 'foo', None)
def test_basics_thoroughly(self, project_id):
gumshoe_queue_path_metadata = self.gumshoe_queue_path + '/metadata'
gumshoe_queue_path_stats = self.gumshoe_queue_path + '/stats'
# Stats not found - queue not created yet
self.simulate_get(gumshoe_queue_path_stats, project_id)
self.assertEqual(falcon.HTTP_404, self.srmock.status)
# Metadata not found - queue not created yet
self.simulate_get(gumshoe_queue_path_metadata, project_id)
self.assertEqual(falcon.HTTP_404, self.srmock.status)
# Create
self.simulate_put(self.gumshoe_queue_path, project_id)
self.assertEqual(falcon.HTTP_201, self.srmock.status)
location = self.srmock.headers_dict['Location']
self.assertEqual(self.gumshoe_queue_path, location)
# Ensure queue existence
self.simulate_head(self.gumshoe_queue_path, project_id)
self.assertEqual(falcon.HTTP_204, self.srmock.status)
# Add metadata
doc = '{"messages": {"ttl": 600}}'
self.simulate_put(gumshoe_queue_path_metadata,
project_id, body=doc)
self.assertEqual(falcon.HTTP_204, self.srmock.status)
# Fetch metadata
result = self.simulate_get(gumshoe_queue_path_metadata,
project_id)
result_doc = jsonutils.loads(result[0])
self.assertEqual(falcon.HTTP_200, self.srmock.status)
self.assertEqual(jsonutils.loads(doc), result_doc)
# Stats empty queue
self.simulate_get(gumshoe_queue_path_stats, project_id)
self.assertEqual(falcon.HTTP_200, self.srmock.status)
# Delete
self.simulate_delete(self.gumshoe_queue_path, project_id)
self.assertEqual(falcon.HTTP_204, self.srmock.status)
# Get non-existent queue
self.simulate_get(self.gumshoe_queue_path, project_id)
self.assertEqual(falcon.HTTP_404, self.srmock.status)
# Get non-existent stats
self.simulate_get(gumshoe_queue_path_stats, project_id)
self.assertEqual(falcon.HTTP_404, self.srmock.status)
# Get non-existent metadata
self.simulate_get(gumshoe_queue_path_metadata, project_id)
self.assertEqual(falcon.HTTP_404, self.srmock.status)
def test_name_restrictions(self):
self.simulate_put(self.queue_path + '/Nice-Boat_2')
self.assertEqual(falcon.HTTP_201, self.srmock.status)
self.simulate_put(self.queue_path + '/Nice-Bo@t')
self.assertEqual(falcon.HTTP_400, self.srmock.status)
self.simulate_put(self.queue_path + '/_' + 'niceboat' * 8)
self.assertEqual(falcon.HTTP_400, self.srmock.status)
def test_project_id_restriction(self):
muvluv_queue_path = self.queue_path + '/Muv-Luv'
self.simulate_put(muvluv_queue_path,
headers={'X-Project-ID': 'JAM Project' * 24})
self.assertEqual(falcon.HTTP_400, self.srmock.status)
# no charset restrictions
self.simulate_put(muvluv_queue_path,
headers={'X-Project-ID': 'JAM Project'})
self.assertEqual(falcon.HTTP_201, self.srmock.status)
def test_non_ascii_name(self):
test_params = (('/queues/non-ascii-n\u0153me', 'utf-8'),
('/queues/non-ascii-n\xc4me', 'iso8859-1'))
for uri, enc in test_params:
uri = self.url_prefix + uri
self.simulate_put(uri)
self.assertEqual(falcon.HTTP_400, self.srmock.status)
self.simulate_get(uri)
self.assertEqual(falcon.HTTP_400, self.srmock.status)
self.simulate_delete(uri)
self.assertEqual(falcon.HTTP_400, self.srmock.status)
def test_no_metadata(self):
self.simulate_put(self.fizbat_queue_path)
self.assertEqual(falcon.HTTP_201, self.srmock.status)
self.simulate_put(self.fizbat_queue_path_metadata)
self.assertEqual(falcon.HTTP_400, self.srmock.status)
self.simulate_put(self.fizbat_queue_path_metadata, body='')
self.assertEqual(falcon.HTTP_400, self.srmock.status)
@ddt.data('{', '[]', '.', ' ', '')
def test_bad_metadata(self, document):
self.simulate_put(self.fizbat_queue_path, '7e55e1a7e')
self.assertEqual(falcon.HTTP_201, self.srmock.status)
self.simulate_put(self.fizbat_queue_path_metadata, '7e55e1a7e',
body=document)
self.assertEqual(falcon.HTTP_400, self.srmock.status)
def test_too_much_metadata(self):
self.simulate_put(self.fizbat_queue_path, '7e55e1a7e')
self.assertEqual(falcon.HTTP_201, self.srmock.status)
doc = '{{"messages": {{"ttl": 600}}, "padding": "{pad}"}}'
max_size = self.transport_cfg.max_queue_metadata
padding_len = max_size - (len(doc) - 10) + 1
doc = doc.format(pad='x' * padding_len)
self.simulate_put(self.fizbat_queue_path_metadata, '7e55e1a7e',
body=doc)
self.assertEqual(falcon.HTTP_400, self.srmock.status)
def test_way_too_much_metadata(self):
self.simulate_put(self.fizbat_queue_path, '7e55e1a7e')
self.assertEqual(falcon.HTTP_201, self.srmock.status)
doc = '{{"messages": {{"ttl": 600}}, "padding": "{pad}"}}'
max_size = self.transport_cfg.max_queue_metadata
padding_len = max_size * 100
doc = doc.format(pad='x' * padding_len)
self.simulate_put(self.fizbat_queue_path_metadata,
'7e55e1a7e', body=doc)
self.assertEqual(falcon.HTTP_400, self.srmock.status)
def test_custom_metadata(self):
self.simulate_put(self.fizbat_queue_path, '480924')
self.assertEqual(falcon.HTTP_201, self.srmock.status)
# Set
doc = '{{"messages": {{"ttl": 600}}, "padding": "{pad}"}}'
max_size = self.transport_cfg.max_queue_metadata
padding_len = max_size - (len(doc) - 2)
doc = doc.format(pad='x' * padding_len)
self.simulate_put(self.fizbat_queue_path_metadata, '480924', body=doc)
self.assertEqual(falcon.HTTP_204, self.srmock.status)
# Get
result = self.simulate_get(self.fizbat_queue_path_metadata, '480924')
result_doc = jsonutils.loads(result[0])
self.assertEqual(jsonutils.loads(doc), result_doc)
self.assertEqual(falcon.HTTP_200, self.srmock.status)
def test_update_metadata(self):
xyz_queue_path = self.url_prefix + '/queues/xyz'
xyz_queue_path_metadata = xyz_queue_path + '/metadata'
# Create
project_id = '480924'
self.simulate_put(xyz_queue_path, project_id)
self.assertEqual(falcon.HTTP_201, self.srmock.status)
# Set meta
doc1 = '{"messages": {"ttl": 600}}'
self.simulate_put(xyz_queue_path_metadata, project_id, body=doc1)
self.assertEqual(falcon.HTTP_204, self.srmock.status)
# Update
doc2 = '{"messages": {"ttl": 100}}'
self.simulate_put(xyz_queue_path_metadata, project_id, body=doc2)
self.assertEqual(falcon.HTTP_204, self.srmock.status)
# Get
result = self.simulate_get(xyz_queue_path_metadata, project_id)
result_doc = jsonutils.loads(result[0])
self.assertEqual(jsonutils.loads(doc2), result_doc)
self.assertEqual(xyz_queue_path_metadata,
self.srmock.headers_dict['Content-Location'])
def test_list(self):
arbitrary_number = 644079696574693
project_id = str(arbitrary_number)
# NOTE(kgriffs): It's important that this one sort after the one
# above. This is in order to prove that bug/1236605 is fixed, and
# stays fixed!
alt_project_id = str(arbitrary_number + 1)
# List empty
self.simulate_get(self.queue_path, project_id)
self.assertEqual(falcon.HTTP_204, self.srmock.status)
# Payload exceeded
self.simulate_get(self.queue_path, project_id, query_string='limit=21')
self.assertEqual(falcon.HTTP_400, self.srmock.status)
# Create some
def create_queue(name, project_id, body):
uri = self.queue_path + '/' + name
self.simulate_put(uri, project_id)
self.simulate_put(uri + '/metadata', project_id, body=body)
create_queue('g1', None, '{"answer": 42}')
create_queue('g2', None, '{"answer": 42}')
create_queue('q1', project_id, '{"node": 31}')
create_queue('q2', project_id, '{"node": 32}')
create_queue('q3', project_id, '{"node": 33}')
create_queue('q3', alt_project_id, '{"alt": 1}')
# List (global queues)
result = self.simulate_get(self.queue_path, None,
query_string='limit=2&detailed=true')
result_doc = jsonutils.loads(result[0])
queues = result_doc['queues']
self.assertEqual(2, len(queues))
for queue in queues:
self.assertEqual({'answer': 42}, queue['metadata'])
# List (limit)
result = self.simulate_get(self.queue_path, project_id,
query_string='limit=2')
result_doc = jsonutils.loads(result[0])
self.assertEqual(2, len(result_doc['queues']))
# List (no metadata, get all)
result = self.simulate_get(self.queue_path,
project_id, query_string='limit=5')
result_doc = jsonutils.loads(result[0])
[target, params] = result_doc['links'][0]['href'].split('?')
self.assertEqual(falcon.HTTP_200, self.srmock.status)
self.assertEqual(self.queue_path + '?limit=5',
self.srmock.headers_dict['Content-Location'])
# Ensure we didn't pick up the queue from the alt project.
queues = result_doc['queues']
self.assertEqual(3, len(queues))
for queue in queues:
self.simulate_get(queue['href'] + '/metadata', project_id)
self.assertEqual(falcon.HTTP_200, self.srmock.status)
self.simulate_get(queue['href'] + '/metadata', 'imnothere')
self.assertEqual(falcon.HTTP_404, self.srmock.status)
self.assertNotIn('metadata', queue)
# List with metadata
result = self.simulate_get(self.queue_path, project_id,
query_string='detailed=true')
self.assertEqual(falcon.HTTP_200, self.srmock.status)
result_doc = jsonutils.loads(result[0])
[target, params] = result_doc['links'][0]['href'].split('?')
queue = result_doc['queues'][0]
result = self.simulate_get(queue['href'] + '/metadata', project_id)
result_doc = jsonutils.loads(result[0])
self.assertEqual(queue['metadata'], result_doc)
self.assertEqual({'node': 31}, result_doc)
# List tail
self.simulate_get(target, project_id, query_string=params)
self.assertEqual(falcon.HTTP_204, self.srmock.status)
# List manually-constructed tail
self.simulate_get(target, project_id, query_string='marker=zzz')
self.assertEqual(falcon.HTTP_204, self.srmock.status)
def test_list_returns_503_on_nopoolfound_exception(self):
arbitrary_number = 644079696574693
project_id = str(arbitrary_number)
header = {
'X-Project-ID': project_id,
}
queue_controller = self.boot.storage.queue_controller
with mock.patch.object(queue_controller, 'list') as mock_queue_list:
def queue_generator():
raise storage_errors.NoPoolFound()
# This generator tries to be like queue controller list generator
# in some ways.
def fake_generator():
yield queue_generator()
yield {}
mock_queue_list.return_value = fake_generator()
self.simulate_get(self.queue_path, headers=header)
self.assertEqual(falcon.HTTP_503, self.srmock.status)
class TestQueueLifecycleFaultyDriver(base.V1BaseFaulty):
config_file = 'wsgi_faulty.conf'
def test_simple(self):
gumshoe_queue_path = self.url_prefix + '/queues/gumshoe'
doc = '{"messages": {"ttl": 600}}'
self.simulate_put(gumshoe_queue_path, '480924', body=doc)
self.assertEqual(falcon.HTTP_503, self.srmock.status)
location = ('Location', gumshoe_queue_path)
self.assertNotIn(location, self.srmock.headers)
result = self.simulate_get(gumshoe_queue_path + '/metadata', '480924')
result_doc = jsonutils.loads(result[0])
self.assertEqual(falcon.HTTP_503, self.srmock.status)
self.assertNotEqual(result_doc, jsonutils.loads(doc))
self.simulate_get(gumshoe_queue_path + '/stats', '480924')
self.assertEqual(falcon.HTTP_503, self.srmock.status)
self.simulate_get(self.url_prefix + '/queues', '480924')
self.assertEqual(falcon.HTTP_503, self.srmock.status)
self.simulate_delete(gumshoe_queue_path, '480924')
self.assertEqual(falcon.HTTP_503, self.srmock.status)

View File

@ -1,127 +0,0 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_serialization import jsonutils
from oslo_utils import uuidutils
import falcon
from zaqar.tests.unit.transport.wsgi import base
class TestValidation(base.V1Base):
config_file = 'wsgi_mongodb_validation.conf'
def setUp(self):
super(TestValidation, self).setUp()
self.project_id = '7e55e1a7e'
self.queue_path = self.url_prefix + '/queues/noein'
self.simulate_put(self.queue_path, self.project_id)
self.headers = {
'Client-ID': uuidutils.generate_uuid(),
}
def tearDown(self):
self.simulate_delete(self.queue_path, self.project_id)
super(TestValidation, self).tearDown()
def test_metadata_deserialization(self):
# Normal case
self.simulate_put(self.queue_path + '/metadata',
self.project_id,
body='{"timespace": "Shangri-la"}')
self.assertEqual(falcon.HTTP_204, self.srmock.status)
# Too long
max_queue_metadata = 64
doc_tmpl = '{{"Dragon Torc":"{0}"}}'
doc_tmpl_ws = '{{ "Dragon Torc" : "{0}" }}' # with whitespace
envelope_length = len(doc_tmpl.format(''))
for tmpl in doc_tmpl, doc_tmpl_ws:
gen = '0' * (max_queue_metadata - envelope_length + 1)
doc = tmpl.format(gen)
self.simulate_put(self.queue_path + '/metadata',
self.project_id,
body=doc)
self.assertEqual(falcon.HTTP_400, self.srmock.status)
def test_message_deserialization(self):
# Normal case
self.simulate_post(self.queue_path + '/messages',
self.project_id,
body='[{"body": "Dragon Knights", "ttl": 100}]',
headers=self.headers)
self.assertEqual(falcon.HTTP_201, self.srmock.status)
# Both messages' size are too long
max_messages_post_size = 256
obj = {'a': 0, 'b': ''}
envelope_length = len(jsonutils.dumps(obj, separators=(',', ':')))
obj['b'] = 'x' * (max_messages_post_size - envelope_length + 1)
for long_body in ('a' * (max_messages_post_size - 2 + 1), obj):
doc = jsonutils.dumps([{'body': long_body, 'ttl': 100}])
self.simulate_post(self.queue_path + '/messages',
self.project_id,
body=doc,
headers=self.headers)
self.assertEqual(falcon.HTTP_400, self.srmock.status)
def test_request_without_client_id(self):
# Unlike newer APIs (v1.1 and v2), there will be no error 400, because
# of missing Client-ID in headers.
empty_headers = {}
self.simulate_put(self.queue_path,
self.project_id,
headers=empty_headers)
# Queue was already created by setUp, expecting 204 response code.
self.assertEqual(falcon.HTTP_204, self.srmock.status)
def test_request_without_client_id_if_resource_name_contains_v2_text(self):
empty_headers = {}
queue_path_with_v2 = self.url_prefix + '/queues/my_name_is_v2'
self.simulate_put(queue_path_with_v2,
self.project_id,
headers=empty_headers)
self.addCleanup(self.simulate_delete, queue_path_with_v2,
self.project_id)
self.assertEqual(falcon.HTTP_201, self.srmock.status)
def test_queue_metadata_putting(self):
# Ensure setting reserved queue attributes (which names start with
# '_') is not allowed in API v1.
# Try set real _default_message_ttl queue attribute.
self.simulate_put(self.queue_path + '/metadata',
self.project_id,
body='{"_default_message_ttl": 60}')
self.assertEqual(falcon.HTTP_400, self.srmock.status)
# Try set a fictional queue attribute.
self.simulate_put(self.queue_path + '/metadata',
self.project_id,
body='{"_min_message_niceness": 9000}')
self.assertEqual(falcon.HTTP_400, self.srmock.status)

View File

@ -31,7 +31,6 @@ from zaqar.transport.middleware import auth
from zaqar.transport.middleware import cors
from zaqar.transport.middleware import profile
from zaqar.transport import validation
from zaqar.transport.wsgi import v1_0
from zaqar.transport.wsgi import v1_1
from zaqar.transport.wsgi import v2_0
from zaqar.transport.wsgi import version
@ -108,7 +107,6 @@ class Driver(transport.DriverBase):
"""Initialize hooks and URI routes to resources."""
catalog = [
('/v1', v1_0.public_endpoints(self, self._conf)),
('/v1.1', v1_1.public_endpoints(self, self._conf)),
('/v2', v2_0.public_endpoints(self, self._conf)),
('/', [('', version.Resource())])
@ -116,7 +114,6 @@ class Driver(transport.DriverBase):
if self._conf.admin_mode:
catalog.extend([
('/v1', v1_0.private_endpoints(self, self._conf)),
('/v1.1', v1_1.private_endpoints(self, self._conf)),
('/v2', v2_0.private_endpoints(self, self._conf)),
])

View File

@ -1,110 +0,0 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
from oslo_log import log as logging
from zaqar.common import decorators
from zaqar.transport.wsgi.v1_0 import claims
from zaqar.transport.wsgi.v1_0 import health
from zaqar.transport.wsgi.v1_0 import homedoc
from zaqar.transport.wsgi.v1_0 import messages
from zaqar.transport.wsgi.v1_0 import metadata
from zaqar.transport.wsgi.v1_0 import pools
from zaqar.transport.wsgi.v1_0 import queues
from zaqar.transport.wsgi.v1_0 import stats
LOG = logging.getLogger(__name__)
VERSION = {
'id': '1',
'status': 'DEPRECATED',
'updated': '2014-9-11T17:47:05Z',
'media-types': [
{
'base': 'application/json',
'type': 'application/vnd.openstack.messaging-v1+json'
}
],
'links': [
{
'href': '/v1/',
'rel': 'self'
}
]
}
@decorators.api_version_manager(VERSION)
def public_endpoints(driver, conf):
queue_controller = driver._storage.queue_controller
message_controller = driver._storage.message_controller
claim_controller = driver._storage.claim_controller
return [
# Home
('/',
homedoc.Resource()),
# Queues Endpoints
('/queues',
queues.CollectionResource(driver._validate,
queue_controller)),
('/queues/{queue_name}',
queues.ItemResource(queue_controller,
message_controller)),
('/queues/{queue_name}/stats',
stats.Resource(queue_controller)),
('/queues/{queue_name}/metadata',
metadata.Resource(driver._wsgi_conf, driver._validate,
queue_controller)),
# Messages Endpoints
('/queues/{queue_name}/messages',
messages.CollectionResource(driver._wsgi_conf,
driver._validate,
message_controller)),
('/queues/{queue_name}/messages/{message_id}',
messages.ItemResource(message_controller)),
# Claims Endpoints
('/queues/{queue_name}/claims',
claims.CollectionResource(driver._wsgi_conf,
driver._validate,
claim_controller)),
('/queues/{queue_name}/claims/{claim_id}',
claims.ItemResource(driver._wsgi_conf,
driver._validate,
claim_controller)),
# Health
('/health',
health.Resource(driver._storage))
]
@decorators.api_version_manager(VERSION)
def private_endpoints(driver, conf):
if not conf.pooling:
return []
pools_controller = driver._control.pools_controller
return [
('/pools',
pools.Listing(pools_controller)),
('/pools/{pool}',
pools.Resource(pools_controller)),
]

View File

@ -1,172 +0,0 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import falcon
from oslo_log import log as logging
from zaqar.common import decorators
from zaqar.i18n import _
from zaqar.storage import errors as storage_errors
from zaqar.transport import utils
from zaqar.transport import validation
from zaqar.transport.wsgi import errors as wsgi_errors
from zaqar.transport.wsgi import utils as wsgi_utils
LOG = logging.getLogger(__name__)
CLAIM_POST_SPEC = (('ttl', int, None), ('grace', int, None))
CLAIM_PATCH_SPEC = (('ttl', int, None), ('grace', int, 0))
class Resource(object):
__slots__ = ('_claim_controller', '_validate')
def __init__(self, wsgi_conf, validate, claim_controller):
self._claim_controller = claim_controller
self._validate = validate
class CollectionResource(Resource):
@decorators.TransportLog("Claims collection")
def on_post(self, req, resp, project_id, queue_name):
# Check for an explicit limit on the # of messages to claim
limit = req.get_param_as_int('limit')
claim_options = {} if limit is None else {'limit': limit}
# Read claim metadata (e.g., TTL) and raise appropriate
# HTTP errors as needed.
document = wsgi_utils.deserialize(req.stream, req.content_length)
metadata = wsgi_utils.sanitize(document, CLAIM_POST_SPEC)
# Claim some messages
try:
self._validate.claim_creation(metadata, limit=limit)
cid, msgs = self._claim_controller.create(
queue_name,
metadata=metadata,
project=project_id,
**claim_options)
# Buffer claimed messages
# TODO(kgriffs): optimize, along with serialization (below)
resp_msgs = list(msgs)
except validation.ValidationFailed as ex:
LOG.debug(ex)
raise wsgi_errors.HTTPBadRequestAPI(str(ex))
except Exception:
description = _('Claim could not be created.')
LOG.exception(description)
raise wsgi_errors.HTTPServiceUnavailable(description)
# Serialize claimed messages, if any. This logic assumes
# the storage driver returned well-formed messages.
if len(resp_msgs) != 0:
resp_msgs = [wsgi_utils.format_message_v1(
msg, req.path.rpartition('/')[0], cid) for msg in resp_msgs]
resp.location = req.path + '/' + cid
resp.text = utils.to_json(resp_msgs)
resp.status = falcon.HTTP_201
else:
resp.status = falcon.HTTP_204
class ItemResource(Resource):
__slots__ = ('_claim_controller', '_validate')
def __init__(self, wsgi_conf, validate, claim_controller):
self._claim_controller = claim_controller
self._validate = validate
@decorators.TransportLog("Claim item")
def on_get(self, req, resp, project_id, queue_name, claim_id):
try:
meta, msgs = self._claim_controller.get(
queue_name,
claim_id=claim_id,
project=project_id)
# Buffer claimed messages
# TODO(kgriffs): Optimize along with serialization (see below)
meta['messages'] = list(msgs)
except storage_errors.DoesNotExist as ex:
LOG.debug(ex)
raise wsgi_errors.HTTPNotFound(str(ex))
except Exception:
description = _('Claim could not be queried.')
LOG.exception(description)
raise wsgi_errors.HTTPServiceUnavailable(description)
# Serialize claimed messages
# TODO(kgriffs): Optimize
meta['messages'] = [wsgi_utils.format_message_v1(
msg, req.path.rsplit('/', 2)[0], meta['id'])
for msg in meta['messages']]
meta['href'] = req.path
del meta['id']
resp.content_location = req.relative_uri
resp.text = utils.to_json(meta)
# status defaults to 200
@decorators.TransportLog("Claim item")
def on_patch(self, req, resp, project_id, queue_name, claim_id):
# Read claim metadata (e.g., TTL) and raise appropriate
# HTTP errors as needed.
document = wsgi_utils.deserialize(req.stream, req.content_length)
metadata = wsgi_utils.sanitize(document, CLAIM_PATCH_SPEC)
try:
self._validate.claim_updating(metadata)
self._claim_controller.update(queue_name,
claim_id=claim_id,
metadata=metadata,
project=project_id)
resp.status = falcon.HTTP_204
except validation.ValidationFailed as ex:
LOG.debug(ex)
raise wsgi_errors.HTTPBadRequestAPI(str(ex))
except storage_errors.DoesNotExist as ex:
LOG.debug(ex)
raise wsgi_errors.HTTPNotFound(str(ex))
except Exception:
description = _('Claim could not be updated.')
LOG.exception(description)
raise wsgi_errors.HTTPServiceUnavailable(description)
@decorators.TransportLog("Claim item")
def on_delete(self, req, resp, project_id, queue_name, claim_id):
try:
self._claim_controller.delete(queue_name,
claim_id=claim_id,
project=project_id)
resp.status = falcon.HTTP_204
except Exception:
description = _('Claim could not be deleted.')
LOG.exception(description)
raise wsgi_errors.HTTPServiceUnavailable(description)

View File

@ -1,30 +0,0 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
import falcon
class Resource(object):
__slots__ = ('_driver',)
def __init__(self, driver):
self._driver = driver
def on_get(self, req, resp, **kwargs):
resp.status = (falcon.HTTP_204 if self._driver.is_alive()
else falcon.HTTP_503)
def on_head(self, req, resp, **kwargs):
resp.status = falcon.HTTP_204

View File

@ -1,142 +0,0 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
from oslo_serialization import jsonutils
# NOTE(kgriffs): http://tools.ietf.org/html/draft-nottingham-json-home-03
JSON_HOME = {
'resources': {
# -----------------------------------------------------------------
# Queues
# -----------------------------------------------------------------
'rel/queues': {
'href-template': '/v1/queues{?marker,limit,detailed}',
'href-vars': {
'marker': 'param/marker',
'limit': 'param/queue_limit',
'detailed': 'param/detailed',
},
'hints': {
'allow': ['GET'],
'formats': {
'application/json': {},
},
},
},
'rel/queue': {
'href-template': '/v1/queues/{queue_name}',
'href-vars': {
'queue_name': 'param/queue_name',
},
'hints': {
'allow': ['GET', 'HEAD', 'PUT', 'DELETE'],
'formats': {
'application/json': {},
},
},
},
'rel/queue-metadata': {
'href-template': '/v1/queues/{queue_name}/metadata',
'href-vars': {
'queue_name': 'param/queue_name',
},
'hints': {
'allow': ['GET', 'PUT'],
'formats': {
'application/json': {},
},
},
},
'rel/queue-stats': {
'href-template': '/v1/queues/{queue_name}/stats',
'href-vars': {
'queue_name': 'param/queue_name',
},
'hints': {
'allow': ['GET'],
'formats': {
'application/json': {},
},
},
},
# -----------------------------------------------------------------
# Messages
# -----------------------------------------------------------------
'rel/messages': {
'href-template': ('/v1/queues/{queue_name}/messages'
'{?marker,limit,echo,include_claimed}'),
'href-vars': {
'queue_name': 'param/queue_name',
'marker': 'param/marker',
'limit': 'param/messages_limit',
'echo': 'param/echo',
'include_claimed': 'param/include_claimed',
},
'hints': {
'allow': ['GET'],
'formats': {
'application/json': {},
},
},
},
'rel/post-messages': {
'href-template': '/v1/queues/{queue_name}/messages',
'href-vars': {
'queue_name': 'param/queue_name',
},
'hints': {
'allow': ['POST'],
'formats': {
'application/json': {},
},
'accept-post': ['application/json'],
},
},
# -----------------------------------------------------------------
# Claims
# -----------------------------------------------------------------
'rel/claim': {
'href-template': '/v1/queues/{queue_name}/claims{?limit}',
'href-vars': {
'queue_name': 'param/queue_name',
'limit': 'param/claim_limit',
},
'hints': {
'allow': ['POST'],
'formats': {
'application/json': {},
},
'accept-post': ['application/json']
},
},
}
}
class Resource(object):
def __init__(self):
document = jsonutils.dumps(JSON_HOME, ensure_ascii=False, indent=4)
self.document_utf8 = document.encode('utf-8')
def on_get(self, req, resp, project_id):
resp.data = self.document_utf8
resp.content_type = 'application/json-home'
resp.cache_control = ['max-age=86400']
# status defaults to 200

View File

@ -1,300 +0,0 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import falcon
from oslo_log import log as logging
from zaqar.common import decorators
from zaqar.common.transport.wsgi import helpers as wsgi_helpers
from zaqar.i18n import _
from zaqar.storage import errors as storage_errors
from zaqar.transport import utils
from zaqar.transport import validation
from zaqar.transport.wsgi import errors as wsgi_errors
from zaqar.transport.wsgi import utils as wsgi_utils
LOG = logging.getLogger(__name__)
MESSAGE_POST_SPEC = (('ttl', int, None), ('body', '*', None))
class CollectionResource(object):
__slots__ = ('_message_controller', '_wsgi_conf', '_validate')
def __init__(self, wsgi_conf, validate, message_controller):
self._wsgi_conf = wsgi_conf
self._validate = validate
self._message_controller = message_controller
# ----------------------------------------------------------------------
# Helpers
# ----------------------------------------------------------------------
def _get_by_id(self, base_path, project_id, queue_name, ids):
"""Returns one or more messages from the queue by ID."""
try:
self._validate.message_listing(limit=len(ids))
messages = self._message_controller.bulk_get(
queue_name,
message_ids=ids,
project=project_id)
except validation.ValidationFailed as ex:
LOG.debug(ex)
raise wsgi_errors.HTTPBadRequestAPI(str(ex))
except Exception:
description = _('Message could not be retrieved.')
LOG.exception(description)
raise wsgi_errors.HTTPServiceUnavailable(description)
# Prepare response
messages = list(messages)
if not messages:
return None
return [wsgi_utils.format_message_v1(m, base_path) for m in messages]
def _get(self, req, project_id, queue_name):
client_uuid = wsgi_helpers.get_client_uuid(req)
kwargs = {}
# NOTE(kgriffs): This syntax ensures that
# we don't clobber default values with None.
req.get_param('marker', store=kwargs)
req.get_param_as_int('limit', store=kwargs)
req.get_param_as_bool('echo', store=kwargs)
req.get_param_as_bool('include_claimed', store=kwargs)
try:
self._validate.message_listing(**kwargs)
results = self._message_controller.list(
queue_name,
project=project_id,
client_uuid=client_uuid,
**kwargs)
# Buffer messages
cursor = next(results)
messages = list(cursor)
except validation.ValidationFailed as ex:
LOG.debug(ex)
raise wsgi_errors.HTTPBadRequestAPI(str(ex))
except storage_errors.DoesNotExist as ex:
LOG.debug(ex)
raise wsgi_errors.HTTPNotFound(str(ex))
except Exception:
description = _('Messages could not be listed.')
LOG.exception(description)
raise wsgi_errors.HTTPServiceUnavailable(description)
if not messages:
return None
# Found some messages, so prepare the response
kwargs['marker'] = next(results)
base_path = req.path.rsplit('/', 1)[0]
messages = [wsgi_utils.format_message_v1(
m, base_path) for m in messages]
return {
'messages': messages,
'links': [
{
'rel': 'next',
'href': req.path + falcon.to_query_str(kwargs)
}
]
}
# ----------------------------------------------------------------------
# Interface
# ----------------------------------------------------------------------
@decorators.TransportLog("Messages collection")
def on_post(self, req, resp, project_id, queue_name):
client_uuid = wsgi_helpers.get_client_uuid(req)
try:
# Place JSON size restriction before parsing
self._validate.message_length(req.content_length)
except validation.ValidationFailed as ex:
LOG.debug(ex)
raise wsgi_errors.HTTPBadRequestAPI(str(ex))
# Deserialize and validate the request body
document = wsgi_utils.deserialize(req.stream, req.content_length)
messages = wsgi_utils.sanitize(document, MESSAGE_POST_SPEC,
doctype=wsgi_utils.JSONArray)
try:
self._validate.message_posting(messages)
message_ids = self._message_controller.post(
queue_name,
messages=messages,
project=project_id,
client_uuid=client_uuid)
except validation.ValidationFailed as ex:
LOG.debug(ex)
raise wsgi_errors.HTTPBadRequestAPI(str(ex))
except storage_errors.DoesNotExist as ex:
LOG.debug(ex)
raise wsgi_errors.HTTPNotFound(str(ex))
except storage_errors.MessageConflict:
description = _('No messages could be enqueued.')
LOG.exception(description)
raise wsgi_errors.HTTPServiceUnavailable(description)
except Exception:
description = _('Messages could not be enqueued.')
LOG.exception(description)
raise wsgi_errors.HTTPServiceUnavailable(description)
# Prepare the response
ids_value = ','.join(message_ids)
resp.location = req.path + '?ids=' + ids_value
hrefs = [req.path + '/' + id for id in message_ids]
# NOTE(kgriffs): As of the Icehouse release, drivers are
# no longer allowed to enqueue a subset of the messages
# submitted by the client; it's all or nothing. Therefore,
# 'partial' is now always False in the v1.0 API, and the
# field has been removed in v1.1.
body = {'resources': hrefs, 'partial': False}
resp.text = utils.to_json(body)
resp.status = falcon.HTTP_201
@decorators.TransportLog("Messages collection")
def on_get(self, req, resp, project_id, queue_name):
resp.content_location = req.relative_uri
ids = req.get_param_as_list('ids')
if ids is None:
response = self._get(req, project_id, queue_name)
else:
response = self._get_by_id(req.path.rsplit('/', 1)[0], project_id,
queue_name, ids)
if response is None:
resp.status = falcon.HTTP_204
return
resp.text = utils.to_json(response)
# status defaults to 200
@decorators.TransportLog("Messages collection")
def on_delete(self, req, resp, project_id, queue_name):
# NOTE(zyuan): Attempt to delete the whole message collection
# (without an "ids" parameter) is not allowed
ids = req.get_param_as_list('ids', required=True)
try:
self._validate.message_listing(limit=len(ids))
self._message_controller.bulk_delete(
queue_name,
message_ids=ids,
project=project_id)
except validation.ValidationFailed as ex:
LOG.debug(ex)
raise wsgi_errors.HTTPBadRequestAPI(str(ex))
except Exception:
description = _('Messages could not be deleted.')
LOG.exception(description)
raise wsgi_errors.HTTPServiceUnavailable(description)
resp.status = falcon.HTTP_204
class ItemResource(object):
__slots__ = '_message_controller'
def __init__(self, message_controller):
self._message_controller = message_controller
@decorators.TransportLog("Messages item")
def on_get(self, req, resp, project_id, queue_name, message_id):
try:
message = self._message_controller.get(
queue_name,
message_id,
project=project_id)
except storage_errors.DoesNotExist as ex:
LOG.debug(ex)
raise wsgi_errors.HTTPNotFound(str(ex))
except Exception:
description = _('Message could not be retrieved.')
LOG.exception(description)
raise wsgi_errors.HTTPServiceUnavailable(description)
resp.content_location = req.relative_uri
message = wsgi_utils.format_message_v1(
message, req.path.rsplit('/', 2)[0])
resp.text = utils.to_json(message)
# status defaults to 200
@decorators.TransportLog("Messages item")
def on_delete(self, req, resp, project_id, queue_name, message_id):
error_title = _('Unable to delete')
try:
self._message_controller.delete(
queue_name,
message_id=message_id,
project=project_id,
claim=req.get_param('claim_id'))
except storage_errors.MessageNotClaimed as ex:
LOG.debug(ex)
description = _('A claim was specified, but the message '
'is not currently claimed.')
raise falcon.HTTPBadRequest(
title=error_title, description=description)
except storage_errors.ClaimDoesNotExist as ex:
LOG.debug(ex)
description = _('The specified claim does not exist or '
'has expired.')
raise falcon.HTTPBadRequest(
title=error_title, description=description)
except storage_errors.NotPermitted as ex:
LOG.debug(ex)
description = _('This message is claimed; it cannot be '
'deleted without a valid claim ID.')
raise falcon.HTTPForbidden(
title=error_title, description=description)
except Exception:
description = _('Message could not be deleted.')
LOG.exception(description)
raise wsgi_errors.HTTPServiceUnavailable(description)
# Alles guete
resp.status = falcon.HTTP_204

View File

@ -1,95 +0,0 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import falcon
from oslo_log import log as logging
from zaqar.common import decorators
from zaqar.i18n import _
from zaqar.storage import errors as storage_errors
from zaqar.transport import utils
from zaqar.transport import validation
from zaqar.transport.wsgi import errors as wsgi_errors
from zaqar.transport.wsgi import utils as wsgi_utils
LOG = logging.getLogger(__name__)
class Resource(object):
__slots__ = ('_wsgi_conf', '_validate', '_queue_ctrl')
def __init__(self, _wsgi_conf, validate, queue_controller):
self._wsgi_conf = _wsgi_conf
self._validate = validate
self._queue_ctrl = queue_controller
@decorators.TransportLog("Queue metadata")
def on_get(self, req, resp, project_id, queue_name):
try:
resp_dict = self._queue_ctrl.get_metadata(queue_name,
project=project_id)
except storage_errors.DoesNotExist as ex:
LOG.debug(ex)
raise wsgi_errors.HTTPNotFound(str(ex))
except Exception:
description = _('Queue metadata could not be retrieved.')
LOG.exception(description)
raise wsgi_errors.HTTPServiceUnavailable(description)
resp.content_location = req.path
resp.text = utils.to_json(resp_dict)
# status defaults to 200
@decorators.TransportLog("Queue metadata")
def on_put(self, req, resp, project_id, queue_name):
try:
# Place JSON size restriction before parsing
self._validate.queue_metadata_length(req.content_length)
# Deserialize queue metadata
document = wsgi_utils.deserialize(req.stream, req.content_length)
metadata = wsgi_utils.sanitize(document)
# Restrict setting any reserved queue attributes
for key in metadata:
if key.startswith('_'):
description = _('Reserved queue attributes in metadata '
'(which names start with "_") can not be '
'set in API v1.')
raise validation.ValidationFailed(description)
except validation.ValidationFailed as ex:
LOG.debug(ex)
raise wsgi_errors.HTTPBadRequestAPI(str(ex))
try:
self._queue_ctrl.set_metadata(queue_name,
metadata=metadata,
project=project_id)
except validation.ValidationFailed as ex:
LOG.debug(ex)
raise wsgi_errors.HTTPBadRequestAPI(str(ex))
except storage_errors.QueueDoesNotExist as ex:
raise wsgi_errors.HTTPNotFound(str(ex))
except Exception:
description = _('Metadata could not be updated.')
LOG.exception(description)
raise wsgi_errors.HTTPServiceUnavailable(description)
resp.status = falcon.HTTP_204
resp.location = req.path

View File

@ -1,234 +0,0 @@
# Copyright (c) 2013 Rackspace Hosting, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""pools: a resource to handle storage pool management
A pool is added by an operator by interacting with the
pooling-related endpoints. When specifying a pool, the
following fields are required:
::
{
"name": string,
"weight": integer,
"uri": string::uri
}
Furthermore, depending on the underlying storage type of pool being
registered, there is an optional field::
{
"options": {...}
}
"""
import falcon
import jsonschema
from oslo_log import log
from zaqar.common.api.schemas import pools as schema
from zaqar.common import utils as common_utils
from zaqar.storage import errors
from zaqar.storage import utils as storage_utils
from zaqar.transport import utils as transport_utils
from zaqar.transport.wsgi import errors as wsgi_errors
from zaqar.transport.wsgi import utils as wsgi_utils
LOG = log.getLogger(__name__)
class Listing(object):
"""A resource to list registered pools
:param pools_controller: means to interact with storage
"""
def __init__(self, pools_controller):
self._ctrl = pools_controller
def on_get(self, request, response, project_id):
"""Returns a pool listing as objects embedded in an object:
::
{
"pools": [
{"href": "", "weight": 100, "uri": ""},
...
],
"links": [
{"href": "", "rel": "next"}
]
}
:returns: HTTP | 200
"""
LOG.debug('LIST pools')
store = {}
request.get_param('marker', store=store)
request.get_param_as_int('limit', store=store)
request.get_param_as_bool('detailed', store=store)
cursor = self._ctrl.list(**store)
pools = list(next(cursor))
results = {}
if pools:
store['marker'] = next(cursor)
for entry in pools:
entry['href'] = request.path + '/' + entry['name']
results['links'] = [
{
'rel': 'next',
'href': request.path + falcon.to_query_str(store)
}
]
results['pools'] = pools
response.content_location = request.relative_uri
response.text = transport_utils.to_json(results)
response.status = falcon.HTTP_200
class Resource(object):
"""A handler for individual pool.
:param pools_controller: means to interact with storage
"""
def __init__(self, pools_controller):
self._ctrl = pools_controller
validator_type = jsonschema.Draft4Validator
self._validators = {
'weight': validator_type(schema.patch_weight),
'uri': validator_type(schema.patch_uri),
'options': validator_type(schema.patch_options),
'create': validator_type(schema.create)
}
def on_get(self, request, response, project_id, pool):
"""Returns a JSON object for a single pool entry:
::
{"weight": 100, "uri": "", options: {...}}
:returns: HTTP | [200, 404]
"""
LOG.debug('GET pool - name: %s', pool)
data = None
detailed = request.get_param_as_bool('detailed') or False
try:
data = self._ctrl.get(pool, detailed)
except errors.PoolDoesNotExist as ex:
LOG.debug(ex)
raise wsgi_errors.HTTPNotFound(str(ex))
data['href'] = request.path
response.text = transport_utils.to_json(data)
response.content_location = request.relative_uri
def on_put(self, request, response, project_id, pool):
"""Registers a new pool. Expects the following input:
::
{"weight": 100, "uri": ""}
An options object may also be provided.
:returns: HTTP | [201, 204]
"""
LOG.debug('PUT pool - name: %s', pool)
conf = self._ctrl.driver.conf
data = wsgi_utils.load(request)
wsgi_utils.validate(self._validators['create'], data)
if not storage_utils.can_connect(data['uri'], conf=conf):
raise wsgi_errors.HTTPBadRequestBody(
'cannot connect to %s' % data['uri']
)
try:
self._ctrl.create(pool, weight=data['weight'],
uri=data['uri'],
options=data.get('options', {}))
response.status = falcon.HTTP_201
response.location = request.path
except errors.PoolAlreadyExists as e:
LOG.exception('Pool "%s" already exists', pool)
raise wsgi_errors.HTTPConflict(str(e))
def on_delete(self, request, response, project_id, pool):
"""Deregisters a pool.
:returns: HTTP | 204
"""
LOG.debug('DELETE pool - name: %s', pool)
self._ctrl.delete(pool)
response.status = falcon.HTTP_204
def on_patch(self, request, response, project_id, pool):
"""Allows one to update a pool's weight, uri, and/or options.
This method expects the user to submit a JSON object
containing at least one of: 'uri', 'weight', 'options'. If
none are found, the request is flagged as bad. There is also
strict format checking through the use of
jsonschema. Appropriate errors are returned in each case for
badly formatted input.
:returns: HTTP | 200,400
"""
LOG.debug('PATCH pool - name: %s', pool)
data = wsgi_utils.load(request)
EXPECT = ('weight', 'uri', 'options')
if not any([(field in data) for field in EXPECT]):
LOG.debug('PATCH pool, bad params')
raise wsgi_errors.HTTPBadRequestBody(
'One of `uri`, `weight`, or `options` needs '
'to be specified'
)
for field in EXPECT:
wsgi_utils.validate(self._validators[field], data)
conf = self._ctrl.driver.conf
if 'uri' in data and not storage_utils.can_connect(data['uri'],
conf=conf):
raise wsgi_errors.HTTPBadRequestBody(
'cannot connect to %s' % data['uri']
)
fields = common_utils.fields(data, EXPECT,
pred=lambda v: v is not None)
try:
self._ctrl.update(pool, **fields)
except errors.PoolDoesNotExist as ex:
LOG.exception('Pool "%s" does not exist', pool)
raise wsgi_errors.HTTPNotFound(str(ex))

View File

@ -1,132 +0,0 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import falcon
from oslo_log import log as logging
from zaqar.common import decorators
from zaqar.i18n import _
from zaqar.transport import utils
from zaqar.transport import validation
from zaqar.transport.wsgi import errors as wsgi_errors
LOG = logging.getLogger(__name__)
class ItemResource(object):
__slots__ = ('_queue_controller', '_message_controller')
def __init__(self, queue_controller, message_controller):
self._queue_controller = queue_controller
self._message_controller = message_controller
@decorators.TransportLog("Queue item")
def on_put(self, req, resp, project_id, queue_name):
try:
created = self._queue_controller.create(
queue_name, project=project_id)
except Exception:
description = _('Queue could not be created.')
LOG.exception(description)
raise wsgi_errors.HTTPServiceUnavailable(description)
resp.status = falcon.HTTP_201 if created else falcon.HTTP_204
resp.location = req.path
@decorators.TransportLog("Queue item")
def on_head(self, req, resp, project_id, queue_name):
if self._queue_controller.exists(queue_name, project=project_id):
resp.status = falcon.HTTP_204
else:
resp.status = falcon.HTTP_404
resp.content_location = req.path
on_get = on_head
@decorators.TransportLog("Queue item")
def on_delete(self, req, resp, project_id, queue_name):
try:
self._queue_controller.delete(queue_name, project=project_id)
except Exception:
description = _('Queue could not be deleted.')
LOG.exception(description)
raise wsgi_errors.HTTPServiceUnavailable(description)
resp.status = falcon.HTTP_204
class CollectionResource(object):
__slots__ = ('_queue_controller', '_validate')
def __init__(self, validate, queue_controller):
self._queue_controller = queue_controller
self._validate = validate
def on_get(self, req, resp, project_id):
LOG.debug('Queue collection GET')
kwargs = {}
# NOTE(kgriffs): This syntax ensures that
# we don't clobber default values with None.
req.get_param('marker', store=kwargs)
req.get_param_as_int('limit', store=kwargs)
req.get_param_as_bool('detailed', store=kwargs)
try:
self._validate.queue_listing(**kwargs)
results = self._queue_controller.list(project=project_id, **kwargs)
# Buffer list of queues
queues = list(next(results))
except validation.ValidationFailed as ex:
LOG.debug(ex)
raise wsgi_errors.HTTPBadRequestAPI(str(ex))
except Exception:
description = _('Queues could not be listed.')
LOG.exception(description)
raise wsgi_errors.HTTPServiceUnavailable(description)
# Check for an empty list
if len(queues) == 0:
resp.status = falcon.HTTP_204
return
# Got some. Prepare the response.
kwargs['marker'] = next(results)
for each_queue in queues:
each_queue['href'] = req.path + '/' + each_queue['name']
response_body = {
'queues': queues,
'links': [
{
'rel': 'next',
'href': req.path + falcon.to_query_str(kwargs)
}
]
}
resp.content_location = req.relative_uri
resp.text = utils.to_json(response_body)
# status defaults to 200

View File

@ -1,72 +0,0 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_log import log as logging
from zaqar.i18n import _
from zaqar.storage import errors as storage_errors
from zaqar.transport import utils
from zaqar.transport.wsgi import errors as wsgi_errors
LOG = logging.getLogger(__name__)
class Resource(object):
__slots__ = '_queue_ctrl'
def __init__(self, queue_controller):
self._queue_ctrl = queue_controller
def on_get(self, req, resp, project_id, queue_name):
try:
resp_dict = self._queue_ctrl.stats(queue_name,
project=project_id)
message_stats = resp_dict['messages']
if message_stats['total'] != 0:
base_path = req.path[:req.path.rindex('/')] + '/messages/'
newest = message_stats['newest']
newest['href'] = base_path + newest['id']
del newest['id']
oldest = message_stats['oldest']
oldest['href'] = base_path + oldest['id']
del oldest['id']
resp.content_location = req.path
resp.text = utils.to_json(resp_dict)
# status defaults to 200
except storage_errors.QueueIsEmpty:
resp_dict = {
'messages': {
'claimed': 0,
'free': 0,
'total': 0
}
}
resp.text = utils.to_json(resp_dict)
except storage_errors.DoesNotExist as ex:
LOG.debug(ex)
raise wsgi_errors.HTTPNotFound(str(ex))
except Exception:
description = _('Queue stats could not be read.')
LOG.exception(description)
raise wsgi_errors.HTTPServiceUnavailable(description)

View File

@ -15,13 +15,11 @@
import falcon
from zaqar.transport import utils
from zaqar.transport.wsgi import v1_0
from zaqar.transport.wsgi import v1_1
from zaqar.transport.wsgi import v2_0
VERSIONS = {
'versions': [
v1_0.VERSION,
v1_1.VERSION,
v2_0.VERSION
]