Added smoke tests for consumers

Moved consumers tests from functional to smoke tests.
Added test to handle get consumers.
Also added consumer behaviors and the consumer model.

Change-Id: I3466fbf6c0f13ba9ea483a8251f19b00104968a3
This commit is contained in:
Thomas Dinkjian 2014-11-24 12:44:25 -06:00
parent 74525c300a
commit 0b8743948a
5 changed files with 324 additions and 212 deletions

View File

@ -0,0 +1,79 @@
"""
Copyright 2014 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from functionaltests.api.v1.behaviors import base_behaviors
from functionaltests.api.v1.models import consumer_model
class ConsumerBehaviors(base_behaviors.BaseBehaviors):
def create_consumer(self, model, container_ref):
"""Register a consumer to a container.
:param model: The metadata for the consumer
:param container_ref: Full reference to a container
:return: A tuple containing the response from the create
and the href to the newly registered consumer
"""
url = '{0}/consumers'.format(container_ref)
resp = self.client.post(url, request_model=model)
returned_data = resp.json()
consumer_data = returned_data['consumers']
return resp, consumer_data
def get_consumers(self, container_ref, limit=10, offset=0):
"""Gets a list of consumers on a container.
:param container_ref: Full reference to a container
:param limit: limits number of returned consumers
:param offset: represents how many records to skip before retrieving
the list
:return: The response from the get and refs to the next/previous list
of consumers
"""
url = '{0}/consumers'.format(container_ref)
params = {'limit': limit, 'offset': offset}
resp = self.client.get(url, params=params)
consumer_list = resp.json()
consumers, next_ref, prev_ref = self.client.get_list_of_models(
consumer_list, consumer_model.ConsumerModel)
return resp, consumers, next_ref, prev_ref
def delete_consumer(self, model, container_ref):
"""Deletes a consumer from a container.
:param model: The metadata for the consumer
:param container_ref: Full reference to a container
:return: The response from the delete
"""
url = '{0}/consumers'.format(container_ref)
resp = self.client.delete(url, request_model=model)
returned_data = resp.json()
consumer_data = returned_data['consumers']
return resp, consumer_data

View File

@ -1,211 +0,0 @@
# Copyright (c) 2014 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from functionaltests.api import base
from functionaltests.api.v1.behaviors import secret_behaviors
from functionaltests.api.v1.models import secret_models
create_secret_data = {
"name": "AES key",
"expiration": "2018-02-28T19:14:44.180394",
"algorithm": "aes",
"bit_length": 256,
"mode": "cbc",
"payload": "gF6+lLoF3ohA9aPRpt+6bQ==",
"payload_content_type": "application/octet-stream",
"payload_content_encoding": "base64",
}
create_consumer_data = {
"name": "consumername",
"URL": "consumerURL"
}
create_consumer_data_for_delete = {
"name": "consumername2",
"URL": "consumerURL2"
}
create_consumer_data_for_recreate = {
"name": "consumername3",
"URL": "consumerURL3"
}
create_consumer_data_for_idempotency = {
"name": "consumername4",
"URL": "consumerURL4"
}
create_container_data = {
"name": "containername",
"type": "generic",
"secret_refs": [
{
"name": "secret1",
},
{
"name": "secret2",
}
]
}
class ConsumersTestCase(base.TestCase):
def _create_a_secret(self):
secret_model = secret_models.SecretModel(**create_secret_data)
resp, secret_ref = self.secret_behaviors.create_secret(secret_model)
self.assertEqual(resp.status_code, 201)
self.assertIsNotNone(secret_ref)
return secret_ref
def setUp(self):
super(ConsumersTestCase, self).setUp()
self.secret_behaviors = secret_behaviors.SecretBehaviors(self.client)
# Set up two secrets
secret_ref_1 = self._create_a_secret()
secret_ref_2 = self._create_a_secret()
# Create a container with our secrets
create_container_data['secret_refs'][0]['secret_ref'] = secret_ref_1
create_container_data['secret_refs'][1]['secret_ref'] = secret_ref_2
container_json_data = json.dumps(create_container_data)
resp = self.client.post(
'containers', container_json_data)
self.assertEqual(resp.status_code, 201)
returned_data = resp.json()
container_ref = returned_data['container_ref']
self.assertIsNotNone(container_ref)
self.container_id = container_ref.split('/')[-1]
def tearDown(self):
self.secret_behaviors.delete_all_created_secrets()
super(ConsumersTestCase, self).tearDown()
def test_create_consumer(self):
"""Covers consumer creation.
All of the data needed to create the consumer is provided in a
single POST.
"""
json_data = json.dumps(create_consumer_data)
resp = self.client.post(
'containers/{0}/consumers'.format(self.container_id),
json_data)
self.assertEqual(resp.status_code, 200)
returned_data = resp.json()
consumer_data = returned_data['consumers']
self.assertIsNotNone(consumer_data)
self.assertIn(create_consumer_data, consumer_data)
def test_delete_consumer(self):
"""Covers consumer deletion.
A consumer is first created, and then the consumer is deleted and
verified to no longer exist.
"""
json_data = json.dumps(create_consumer_data_for_delete)
# Register the consumer once
resp = self.client.post(
'containers/{0}/consumers'.format(self.container_id),
json_data)
self.assertEqual(resp.status_code, 200)
returned_data = resp.json()
consumer_data = returned_data['consumers']
self.assertIsNotNone(consumer_data)
self.assertIn(create_consumer_data_for_delete, consumer_data)
# Delete the consumer
resp = self.client.delete(
'containers/{0}/consumers'.format(self.container_id),
json_data)
self.assertEqual(resp.status_code, 200)
returned_data = resp.json()
consumer_data = returned_data['consumers']
self.assertIsNotNone(consumer_data)
self.assertNotIn(create_consumer_data_for_delete, consumer_data)
def test_recreate_consumer(self):
"""Covers consumer recreation."""
json_data = json.dumps(create_consumer_data_for_recreate)
# Register the consumer once
resp = self.client.post(
'containers/{0}/consumers'.format(self.container_id),
json_data)
self.assertEqual(resp.status_code, 200)
returned_data = resp.json()
consumer_data = returned_data['consumers']
self.assertIsNotNone(consumer_data)
self.assertIn(create_consumer_data_for_recreate, consumer_data)
# Delete the consumer
resp = self.client.delete(
'containers/{0}/consumers'.format(self.container_id),
json_data)
self.assertEqual(resp.status_code, 200)
returned_data = resp.json()
consumer_data = returned_data['consumers']
self.assertIsNotNone(consumer_data)
self.assertNotIn(create_consumer_data_for_recreate, consumer_data)
# Register the consumer again
resp = self.client.post(
'containers/{0}/consumers'.format(self.container_id),
json_data)
self.assertEqual(resp.status_code, 200)
returned_data = resp.json()
consumer_data = returned_data['consumers']
self.assertIsNotNone(consumer_data)
self.assertIn(create_consumer_data_for_recreate, consumer_data)
def test_create_consumer_is_idempotent(self):
"""Covers checking that create consumer is idempotent."""
json_data = json.dumps(create_consumer_data_for_idempotency)
# Register the consumer once
resp = self.client.post(
'containers/{0}/consumers'.format(self.container_id),
json_data)
self.assertEqual(resp.status_code, 200)
returned_data = resp.json()
consumer_data = returned_data['consumers']
self.assertIsNotNone(consumer_data)
self.assertIn(create_consumer_data_for_idempotency, consumer_data)
# Register the consumer again, without deleting it first
resp = self.client.post(
'containers/{0}/consumers'.format(self.container_id),
json_data)
self.assertEqual(resp.status_code, 200)
returned_data = resp.json()
consumer_data = returned_data['consumers']
self.assertIsNotNone(consumer_data)
self.assertIn(create_consumer_data_for_idempotency, consumer_data)
count = consumer_data.count(create_consumer_data_for_idempotency)
self.assertEqual(1, count)

View File

@ -0,0 +1,28 @@
"""
Copyright 2014 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from functionaltests.api.v1.models.base_models import BaseModel
class ConsumerModel(BaseModel):
def __init__(self, name=None, URL=None, created=None, updated=None,
status=None):
super(ConsumerModel, self).__init__()
self.name = name
self.URL = URL
self.created = created
self.updated = updated
self.status = status

View File

@ -0,0 +1,216 @@
# Copyright (c) 2014 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
from testtools import testcase
from functionaltests.api import base
from functionaltests.api.v1.behaviors import consumer_behaviors
from functionaltests.api.v1.behaviors import container_behaviors
from functionaltests.api.v1.behaviors import secret_behaviors
from functionaltests.api.v1.models import consumer_model
from functionaltests.api.v1.models import container_models
from functionaltests.api.v1.models import secret_models
create_secret_data = {
"name": "AES key",
"expiration": "2018-02-28T19:14:44.180394",
"algorithm": "aes",
"bit_length": 256,
"mode": "cbc",
"payload": "gF6+lLoF3ohA9aPRpt+6bQ==",
"payload_content_type": "application/octet-stream",
"payload_content_encoding": "base64",
}
default_consumer_data = {
"name": "consumername",
"URL": "consumerURL"
}
create_container_data = {
"name": "containername",
"type": "generic",
"secret_refs": [
{
"name": "secret1",
},
{
"name": "secret2",
}
]
}
class ConsumersTestCase(base.TestCase):
default_data = default_consumer_data
def _create_a_secret(self):
secret_model = secret_models.SecretModel(**create_secret_data)
resp, secret_ref = self.secret_behaviors.create_secret(secret_model)
self.assertEqual(resp.status_code, 201)
self.assertIsNotNone(secret_ref)
return secret_ref
def setUp(self):
super(ConsumersTestCase, self).setUp()
self.secret_behaviors = secret_behaviors.SecretBehaviors(self.client)
self.container_behaviors = container_behaviors.ContainerBehaviors(
self.client
)
self.consumer_behaviors = consumer_behaviors.ConsumerBehaviors(
self.client
)
self.consumer_data = copy.deepcopy(self.default_data)
# Set up two secrets
secret_ref_1 = self._create_a_secret()
secret_ref_2 = self._create_a_secret()
# Create a container with our secrets
create_container_data['secret_refs'][0]['secret_ref'] = secret_ref_1
create_container_data['secret_refs'][1]['secret_ref'] = secret_ref_2
container_model = container_models.ContainerModel(
**create_container_data
)
resp, container_ref = self.container_behaviors.create_container(
container_model
)
self.assertEqual(resp.status_code, 201)
self.assertIsNotNone(container_ref)
self.container_ref = container_ref
def tearDown(self):
self.secret_behaviors.delete_all_created_secrets()
super(ConsumersTestCase, self).tearDown()
@testcase.attr('positive')
def test_create_consumer_defaults(self):
"""Covers consumer creation.
All of the data needed to create the consumer is provided in a
single POST.
"""
test_model = consumer_model.ConsumerModel(**self.consumer_data)
resp, consumer_data = self.consumer_behaviors.create_consumer(
test_model, self.container_ref)
self.assertEqual(resp.status_code, 200)
self.assertIsNotNone(consumer_data)
@testcase.attr('positive')
def test_get_consumer_defaults(self):
"""Tests getting a list of consumers for a container."""
# Create first consumer
test_model = consumer_model.ConsumerModel(**self.consumer_data)
resp, consumer_data = self.consumer_behaviors.create_consumer(
test_model, self.container_ref)
self.assertEqual(resp.status_code, 200)
self.assertIsNotNone(consumer_data)
# Create second consumer
test_model.name = "consumername2"
test_model.URL = "consumerURL2"
resp, consumer_data = self.consumer_behaviors.create_consumer(
test_model, self.container_ref)
self.assertEqual(resp.status_code, 200)
self.assertIsNotNone(consumer_data)
# Get list of consumers
resp, consumers, nref, pref = self.consumer_behaviors.get_consumers(
self.container_ref
)
self.assertEqual(resp.status_code, 200)
self.assertIn("consumername", consumers[0].name)
self.assertIn("consumername2", consumers[1].name)
@testcase.attr('positive')
def test_delete_consumer_defaults(self):
"""Covers consumer deletion.
A consumer is first created, and then the consumer is deleted and
verified to no longer exist.
"""
test_model = consumer_model.ConsumerModel(**self.consumer_data)
resp, consumer_data = self.consumer_behaviors.create_consumer(
test_model, self.container_ref)
self.assertEqual(resp.status_code, 200)
self.assertIsNotNone(consumer_data)
# Delete the consumer
resp, consumer_data = self.consumer_behaviors.delete_consumer(
test_model, self.container_ref
)
self.assertEqual(resp.status_code, 200)
self.assertIsNotNone(consumer_data)
self.assertNotIn(test_model.name, consumer_data)
self.assertNotIn(test_model.URL, consumer_data)
@testcase.attr('positive')
def test_recreate_consumer_defaults(self):
"""Covers consumer recreation."""
test_model = consumer_model.ConsumerModel(**self.consumer_data)
resp, consumer_data = self.consumer_behaviors.create_consumer(
test_model, self.container_ref)
self.assertEqual(resp.status_code, 200)
self.assertIsNotNone(consumer_data)
# Delete the consumer
resp, consumer_data = self.consumer_behaviors.delete_consumer(
test_model, self.container_ref
)
self.assertEqual(resp.status_code, 200)
self.assertIsNotNone(consumer_data)
self.assertNotIn(test_model.name, consumer_data)
self.assertNotIn(test_model.URL, consumer_data)
# Register the consumer again
test_model = consumer_model.ConsumerModel(**self.consumer_data)
resp, consumer_data = self.consumer_behaviors.create_consumer(
test_model, self.container_ref)
self.assertEqual(resp.status_code, 200)
self.assertIsNotNone(consumer_data)
@testcase.attr('positive')
def test_create_consumer_defaults_is_idempotent(self):
"""Covers checking that create consumer is idempotent."""
# Register the consumer once
test_model = consumer_model.ConsumerModel(**self.consumer_data)
resp, consumer_data = self.consumer_behaviors.create_consumer(
test_model, self.container_ref)
self.assertEqual(resp.status_code, 200)
self.assertIsNotNone(consumer_data)
# Register the consumer again, without deleting it first
test_model = consumer_model.ConsumerModel(**self.consumer_data)
resp, consumer_data = self.consumer_behaviors.create_consumer(
test_model, self.container_ref)
count = consumer_data.count(self.consumer_data)
self.assertEqual(resp.status_code, 200)
self.assertIsNotNone(consumer_data)
self.assertIn(self.consumer_data, consumer_data)
self.assertEqual(1, count)

View File

@ -103,7 +103,7 @@ class BarbicanClient(object):
next_ref = item_list.get('next')
elif 'previous' == item:
prev_ref = item_list.get('previous')
elif item in ('secrets', 'orders', 'containers'):
elif item in ('secrets', 'orders', 'containers', 'consumers'):
for entity in item_list.get(item):
models.append(model_type(**entity))