Share and delete signed test results

Sharing test results is avaible. By deault, any signed test result
considered as private.

Permisson system added. Resource can be reached by any user(if it is shared)  or
only by resource's owner.

Users can delete their own test results.

Lots of refactoring done in this patch.

Change-Id: I2b11eea2b895b3043cf242875ffc2078d5ff3592
This commit is contained in:
sslypushenko 2015-07-31 13:03:17 +03:00
parent 2b89f65ad4
commit c6322c8b5b
18 changed files with 1289 additions and 295 deletions

View File

@ -28,11 +28,11 @@ refstackApp.controller('profileController',
'resource': key,
'format': key.format,
'shortKey': [
key.key.slice(0, 10),
key.pubkey.slice(0, 10),
'.',
key.key.slice(-10, -1)
key.pubkey.slice(-10, -1)
].join('.'),
'key': key.key,
'pubkey': key.pubkey,
'comment': key.comment
});
});
@ -65,7 +65,7 @@ refstackApp.controller('profileController',
});
};
$scope.showRes = function(pubKey){
raiseAlert('success', '', pubKey.key);
raiseAlert('success', '', pubKey.pubkey);
};
$scope.updatePubKeys();
}
@ -104,7 +104,7 @@ refstackApp.controller('showPubKeyModalController',
'use strict';
$scope.pubKey = pubKey.resource;
$scope.rawKey = [pubKey.format,
pubKey.key, pubKey.comment].join('\n');
pubKey.pubkey, pubKey.comment].join('\n');
$scope.deletePubKey = function () {
$scope.pubKey.$remove(
{id: $scope.pubKey.id},

View File

@ -1,13 +1,28 @@
<h3>Test Run Results</h3>
<div ng-show="resultsData" class="test-report">
<strong>Test ID:</strong> {{testId}}<br />
<strong>Upload Date:</strong> {{resultsData.created_at}} UTC<br />
<strong>Duration:</strong> {{resultsData.duration_seconds}} seconds<br />
<strong>Total Number of Passed Tests:</strong> {{resultsData.results.length}}<br />
<hr>
<div ng-show="resultsData" class="container-fluid">
<div class="row">
<div class="pull-left">
<div class="test-report">
<strong>Test ID:</strong> {{testId}}<br />
<div ng-if="isEditingAllowed()"><strong>Cloud ID:</strong> {{resultsData.cpid}}<br /></div>
<strong>Upload Date:</strong> {{resultsData.created_at}} UTC<br />
<strong>Duration:</strong> {{resultsData.duration_seconds}} seconds<br />
<strong>Total Number of Passed Tests:</strong> {{resultsData.results.length}}<br />
<hr>
</div>
</div>
<div class="pull-right">
<div ng-show="isEditingAllowed()">
<button class="btn btn-warning" ng-hide="isShared()" ng-click="shareTestRun(true)">Share</button>
<button class="btn btn-success" ng-show="isShared()" ng-click="shareTestRun(false)">Unshare</button>
<button type="button" class="btn btn-danger" ng-click="deleteTestRun()">Delete</button>
</div>
</div>
</div>
</div>
<div ng-show="resultsData">
<p>See how these results stack up against DefCore capabilities and OpenStack
<a target="_blank" href="http://www.openstack.org/brand/interop/">target marketing programs.</a>

View File

@ -6,8 +6,10 @@ var refstackApp = angular.module('refstackApp');
* view details for a specific test run.
*/
refstackApp.controller('resultsReportController',
['$scope', '$http', '$stateParams', 'refstackApiUrl',
function ($scope, $http, $stateParams, refstackApiUrl) {
['$scope', '$http', '$stateParams',
'$window', 'refstackApiUrl', 'raiseAlert',
function ($scope, $http, $stateParams, $window,
refstackApiUrl, raiseAlert) {
'use strict';
/** The testID extracted from the URL route. */
@ -73,6 +75,53 @@ refstackApp.controller('resultsReportController',
});
};
$scope.isEditingAllowed = function () {
return Boolean($scope.resultsData &&
$scope.resultsData.user_role === 'owner');
};
$scope.isShared = function () {
return Boolean($scope.resultsData &&
'shared' in $scope.resultsData.meta);
};
$scope.shareTestRun = function (shareState) {
var content_url = [
refstackApiUrl, '/results/', $scope.testId, '/meta/shared'
].join('');
if (shareState) {
$scope.shareRequest =
$http.post(content_url, 'true').success(function () {
$scope.resultsData.meta.shared = 'true';
raiseAlert('success', '', 'Test run shared!');
}).error(function (error) {
raiseAlert('danger',
error.title, error.detail);
});
} else {
$scope.shareRequest =
$http.delete(content_url).success(function () {
delete $scope.resultsData.meta.shared;
raiseAlert('success', '', 'Test run unshared!');
}).error(function (error) {
raiseAlert('danger',
error.title, error.detail);
});
}
};
$scope.deleteTestRun = function () {
var content_url = [
refstackApiUrl, '/results/', $scope.testId
].join('');
$scope.deleteRequest =
$http.delete(content_url).success(function () {
$window.history.back();
}).error(function (error) {
raiseAlert('danger',
error.title, error.detail);
});
};
/**
* This will contact the Refstack API server to retrieve the JSON
* content of the capability file corresponding to the selected
@ -302,4 +351,5 @@ refstackApp.controller('resultsReportController',
getResults();
}
]);
]
);

View File

@ -1,5 +1,5 @@
<h3>{{pageHeader}}</h3>
<p>The most recently uploaded community test results are listed here. Currently, these results are anonymous.</p>
<p>The most recently uploaded community test results are listed here.</p>
<div class="result-filters">
<h4>Filters</h4>
@ -48,13 +48,15 @@
<tr>
<th>Upload Date</th>
<th>Test Run ID</th>
<th ng-if="::isUserResults">Shared</th>
</tr>
</thead>
<tbody>
<tr ng-repeat="result in data.results">
<td>{{result.created_at}}</td>
<td><a ui-sref="resultsDetail({testID: result.test_id})">{{result.test_id}}</a></td>
<td><a ui-sref="resultsDetail({testID: result.id})">{{result.id}}</a></td>
<td ng-if="::isUserResults"><span ng-show="result.meta.shared" class="glyphicon glyphicon-share"></span></td>
</tr>
</tbody>
</table>

View File

@ -20,6 +20,8 @@ END_DATE = 'end_date'
CPID = 'cpid'
PAGE = 'page'
SIGNED = 'signed'
OPENID = 'openid'
USER_PUBKEYS = 'pubkeys'
# OpenID parameters
OPENID_MODE = 'openid.mode'
@ -40,3 +42,8 @@ USER_OPENID = 'user_openid'
# Test metadata fields
PUBLIC_KEY = 'public_key'
SHARED_TEST_RUN = 'shared'
# Roles
ROLE_USER = 'user'
ROLE_OWNER = 'owner'

View File

@ -18,6 +18,7 @@
from oslo_config import cfg
from oslo_log import log
import pecan
from pecan import rest
from six.moves.urllib import parse
from refstack import db
@ -31,38 +32,85 @@ LOG = log.getLogger(__name__)
CONF = cfg.CONF
@api_utils.check_permissions(level=const.ROLE_USER)
class MetadataController(rest.RestController):
"""/v1/results/<test_id>/meta handler."""
rw_access_keys = ('shared',)
@pecan.expose('json')
def get(self, test_id):
"""Get test run metadata."""
test_info = db.get_test(test_id)
return test_info['meta']
@pecan.expose('json')
def get_one(self, test_id, key):
"""Get value for key from test run metadata."""
return db.get_test_meta_key(test_id, key)
@api_utils.check_permissions(level=const.ROLE_OWNER)
@pecan.expose('json')
def post(self, test_id, key):
"""Save value for key in test run metadata."""
db.save_test_meta_item(test_id, key, pecan.request.body)
pecan.response.status = 201
@api_utils.check_permissions(level=const.ROLE_OWNER)
@pecan.expose('json')
def delete(self, test_id, key):
"""Delete key from test run metadata."""
db.delete_test_meta_item(test_id, key)
pecan.response.status = 204
class ResultsController(validation.BaseRestControllerWithValidation):
"""/v1/results handler."""
__validator__ = validators.TestResultValidator
def get_item(self, item_id):
"""Handler for getting item."""
test_info = db.get_test(item_id)
if not test_info:
pecan.abort(404)
test_list = db.get_test_results(item_id)
test_name_list = [test_dict[0] for test_dict in test_list]
return {"cpid": test_info.cpid,
"created_at": test_info.created_at,
"duration_seconds": test_info.duration_seconds,
"results": test_name_list}
meta = MetadataController()
def store_item(self, item_in_json):
@pecan.expose('json')
@api_utils.check_permissions(level=const.ROLE_USER)
def get_one(self, test_id):
"""Handler for getting item."""
if api_utils.get_user_role(test_id) == const.ROLE_OWNER:
test_info = db.get_test(
test_id, allowed_keys=['id', 'cpid', 'created_at',
'duration_seconds', 'meta']
)
else:
test_info = db.get_test(test_id)
test_list = db.get_test_results(test_id)
test_name_list = [test_dict['name'] for test_dict in test_list]
test_info.update({'results': test_name_list,
'user_role': api_utils.get_user_role(test_id)})
return test_info
def store_item(self, test):
"""Handler for storing item. Should return new item id."""
item = item_in_json.copy()
test_ = test.copy()
if pecan.request.headers.get('X-Public-Key'):
if 'metadata' not in item:
item['metadata'] = {}
item['metadata'][const.PUBLIC_KEY] = \
if 'meta' not in test_:
test_['meta'] = {}
test_['meta'][const.PUBLIC_KEY] = \
pecan.request.headers.get('X-Public-Key')
test_id = db.store_results(item)
LOG.debug(item)
test_id = db.store_results(test_)
LOG.debug(test_)
return {'test_id': test_id,
'url': parse.urljoin(CONF.ui_url,
CONF.api.test_results_url) % test_id}
@pecan.expose('json')
@api_utils.check_permissions(level=const.ROLE_OWNER)
def delete(self, test_id):
"""Delete test run."""
db.delete_test(test_id)
pecan.response.status = 204
@pecan.expose('json')
def get(self):
"""Get information of all uploaded test results.
@ -92,17 +140,12 @@ class ResultsController(validation.BaseRestControllerWithValidation):
try:
per_page = CONF.api.results_per_page
records = db.get_test_records(page_number, per_page, filters)
results = db.get_test_records(page_number, per_page, filters)
results = []
for r in records:
results.append({
'test_id': r.id,
'created_at': r.created_at,
'cpid': r.cpid,
'url': parse.urljoin(CONF.ui_url,
CONF.api.test_results_url) % r.id
})
for result in results:
result.update({'url': parse.urljoin(
CONF.ui_url, CONF.api.test_results_url
) % result['id']})
page = {'results': results,
'pagination': {

View File

@ -31,11 +31,6 @@ class PublicKeysController(validation.BaseRestControllerWithValidation):
__validator__ = validators.PubkeyValidator
# We don't need expose GET url <pubkeys endpoint>/<id>
def get_item(self, item_id):
"""Handler for getting item."""
pecan.abort(404)
@secure(api_utils.is_authenticated)
@pecan.expose('json')
def post(self, ):
@ -48,7 +43,7 @@ class PublicKeysController(validation.BaseRestControllerWithValidation):
parts = body['raw_key'].strip().split()
if len(parts) == 2:
parts.append('')
pubkey['format'], pubkey['key'], pubkey['comment'] = parts
pubkey['format'], pubkey['pubkey'], pubkey['comment'] = parts
pubkey_id = db.store_pubkey(pubkey)
return pubkey_id
@ -56,17 +51,17 @@ class PublicKeysController(validation.BaseRestControllerWithValidation):
@pecan.expose('json')
def get(self):
"""Retrieve all user's public pubkeys."""
user_openid = api_utils.get_user_id()
return db.get_user_pubkeys(user_openid)
return api_utils.get_user_public_keys()
@secure(api_utils.is_authenticated)
@pecan.expose('json')
def delete(self, pubkey_id):
"""Delete public key."""
pubkeys = db.get_user_pubkeys(api_utils.get_user_id())
pubkeys = api_utils.get_user_public_keys()
for key in pubkeys:
if key['id'] == pubkey_id:
db.delete_pubkey(pubkey_id)
pecan.response.status = 204
return
else:
pecan.abort(404)

View File

@ -45,22 +45,10 @@ class BaseRestControllerWithValidation(rest.RestController):
else:
raise ValueError("__validator__ is not defined")
def get_item(self, item_id): # pragma: no cover
"""Handler for getting item."""
raise NotImplementedError
def store_item(self, item_in_json): # pragma: no cover
"""Handler for storing item. Should return new item id."""
raise NotImplementedError
@pecan.expose('json')
def get_one(self, item_id):
"""Return test results in JSON format.
:param item_id: item ID in uuid4 format or action
"""
return self.get_item(item_id=item_id)
@pecan.expose('json')
def schema(self):
"""Return validation schema."""

View File

@ -15,14 +15,17 @@
"""Refstack API's utils."""
import copy
import functools
import random
import requests
import string
import types
from oslo_config import cfg
from oslo_log import log
from oslo_utils import timeutils
import pecan
import pecan.rest
import six
from six.moves.urllib import parse
@ -88,10 +91,10 @@ def parse_input_params(expected_input_params):
})
if const.SIGNED in filters:
if is_authenticated():
filters['openid'] = get_user_id()
filters['pubkeys'] = [
' '.join((pk['format'], pk['key']))
for pk in db.get_user_pubkeys(filters['openid'])
filters[const.OPENID] = get_user_id()
filters[const.USER_PUBKEYS] = [
' '.join((pk['format'], pk['pubkey']))
for pk in get_user_public_keys()
]
else:
raise ParseInputsError('To see signed test '
@ -187,7 +190,7 @@ def get_user():
def get_user_public_keys():
"""Return db record for authenticated user."""
"""Return public keys for authenticated user."""
return db.get_user_pubkeys(get_user_id())
@ -197,11 +200,91 @@ def is_authenticated():
try:
if get_user():
return True
except db.UserNotFound:
except db.NotFound:
pass
return False
def enforce_permissions(test_id, level):
"""Check that user role is required for specified test run."""
role = get_user_role(test_id)
if not role:
pecan.abort(401)
if level == const.ROLE_USER:
if role in (const.ROLE_OWNER, const.ROLE_USER):
return
pecan.abort(403)
if level == const.ROLE_OWNER:
if get_user_role(test_id) in (const.ROLE_OWNER,):
return
pecan.abort(403)
else:
raise ValueError('Permission level %s is undefined'
'' % level)
def get_user_role(test_id):
"""Return user role for current user and specified test run."""
if _check_owner(test_id):
return const.ROLE_OWNER
if _check_user(test_id):
return const.ROLE_USER
return
def _check_user(test_id):
"""Check that user has access to shared test run."""
test_pubkey = db.get_test_meta_key(test_id, const.PUBLIC_KEY)
if not test_pubkey:
return True
elif db.get_test_meta_key(test_id, const.SHARED_TEST_RUN):
return True
else:
return _check_owner(test_id)
def _check_owner(test_id):
"""Check that user has access to specified test run as owner."""
if not is_authenticated():
return False
test_pubkey = db.get_test_meta_key(test_id, const.PUBLIC_KEY)
return test_pubkey in [' '.join((pk['format'], pk['pubkey']))
for pk in get_user_public_keys()]
def check_permissions(level):
"""Decorator for checking permissions.
It checks that user have enough permissions to access and manipulate
an information about selected test run.
Any user has role: const.ROLE_USER. It allows access to unsigned, shared
and own test runs.
Owner role: const.ROLE_OWNER allows access only to user's own results.
"""
def decorator(method_or_class):
def wrapper(method):
@functools.wraps(method)
def wrapped(*args, **kwargs):
test_id = args[1]
enforce_permissions(test_id, level)
return method(*args, **kwargs)
return wrapped
if isinstance(method_or_class, types.FunctionType):
return wrapper(method_or_class)
elif issubclass(method_or_class, pecan.rest.RestController):
controller = method_or_class
for method_name in ('get', 'get_all', 'get_one',
'post', 'put', 'delete'):
if hasattr(controller, method_name):
setattr(controller, method_name,
wrapper(getattr(controller, method_name)))
return controller
return decorator
def verify_openid_request(request):
"""Verify OpenID returned request in OpenID."""
verify_params = dict(request.params.copy())

View File

@ -136,7 +136,7 @@ class TestResultValidator(BaseValidator):
try:
key = RSA.importKey(request.headers.get('X-Public-Key', ''))
except ValueError as e:
except (binascii.Error, ValueError) as e:
raise ValidationError('Malformed public key', e)
signer = PKCS1_v1_5.new(key)
data_hash = SHA256.new()
@ -155,8 +155,13 @@ class PubkeyValidator(BaseValidator):
"""Validator for uploaded public pubkeys."""
schema = {
'raw_key': 'string',
'self_signature': 'string',
'type': 'object',
'properties': {
'raw_key': {'type': 'string'},
'self_signature': {'type': 'string'}
},
'required': ['raw_key', 'self_signature'],
'additionalProperties': False
}
def validate(self, request):
@ -176,7 +181,7 @@ class PubkeyValidator(BaseValidator):
try:
key = RSA.importKey(body['raw_key'])
except ValueError as e:
except (binascii.Error, ValueError) as e:
raise ValidationError('Malformed public key', e)
signer = PKCS1_v1_5.new(key)
data_hash = SHA256.new()

View File

@ -36,7 +36,7 @@ _BACKEND_MAPPING = {'sqlalchemy': 'refstack.db.sqlalchemy.api'}
IMPL = db_api.DBAPI.from_config(cfg.CONF, backend_mapping=_BACKEND_MAPPING,
lazy=True)
UserNotFound = IMPL.UserNotFound
NotFound = IMPL.NotFound
def store_results(results):
@ -47,22 +47,63 @@ def store_results(results):
return IMPL.store_results(results)
def get_test(test_id):
"""Get test information from the database.
def get_test(test_id, allowed_keys=None):
"""Get test run information from the database.
:param test_id: The ID of the test.
"""
return IMPL.get_test(test_id)
return IMPL.get_test(test_id, allowed_keys=allowed_keys)
def delete_test(test_id):
"""Delete test run information from the database.
:param test_id: The ID of the test.
"""
return IMPL.delete_test(test_id)
def get_test_results(test_id):
"""Get all passed tempest tests for a particular test.
"""Get all passed tempest tests for a specified test run.
:param test_id: The ID of the test.
"""
return IMPL.get_test_results(test_id)
def get_test_meta_key(test_id, key, default=None):
"""Get metadata value related to specified test run.
:param test_id: The ID of the test.
:param key: Metadata key
:param default: Default value
"""
return IMPL.get_test_meta_key(test_id, key, default)
def save_test_meta_item(test_id, key, value):
"""Store or update item value related to specified test run.
:param test_id: The ID of the test.
:param key: Metadata key
"""
return IMPL.save_test_meta_item(test_id, key, value)
def delete_test_meta_item(test_id, key):
"""Delete metadata item related to specified test run.
:param test_id: The ID of the test.
:param key: Metadata key
:param default: Default value
:raise NotFound if default value is not set and no value found
"""
return IMPL.delete_test_meta_item(test_id, key)
def get_test_records(page_number, per_page, filters):
"""Get page with applied filters for uploaded test records.

View File

@ -1,7 +1,7 @@
# Copyright (c) 2015 Mirantis, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
@ -23,7 +23,7 @@ import uuid
from oslo_config import cfg
from oslo_db import options as db_options
from oslo_db.sqlalchemy import session as db_session
from oslo_db.exception import DBDuplicateEntry
from oslo_db import exception as oslo_db_exc
import six
from refstack.api import constants as api_const
@ -37,6 +37,17 @@ _FACADE = None
db_options.set_defaults(cfg.CONF)
class NotFound(Exception):
"""Raise if item not found in db."""
def __init__(self, model, details=None):
"""Init."""
self.model = model
title = details if details else ''.join((model, ' not found.'))
super(NotFound, self).__init__(title)
def _create_facade_lazily():
"""Create DB facade lazily."""
global _FACADE
@ -62,6 +73,39 @@ def get_backend():
return sys.modules[__name__]
def _to_dict(sqlalchemy_object, allowed_keys=None):
if isinstance(sqlalchemy_object, list):
return [_to_dict(obj) for obj in sqlalchemy_object]
if (hasattr(sqlalchemy_object, 'keys')
and hasattr(sqlalchemy_object, 'index')):
return {key: getattr(sqlalchemy_object, key)
for key in sqlalchemy_object.keys()}
if hasattr(sqlalchemy_object, 'default_allowed_keys'):
items = sqlalchemy_object.iteritems()
if not allowed_keys:
allowed_keys = sqlalchemy_object.default_allowed_keys
if allowed_keys:
items = filter(lambda item: item[0] in allowed_keys, items)
result = {}
for key, value in items:
if key in sqlalchemy_object.metadata_keys:
result[key] = {
item.get(sqlalchemy_object.metadata_keys[key]['key']):
item.get(sqlalchemy_object.metadata_keys[key]['value'])
for item in value}
elif hasattr(value, 'default_allowed_keys'):
result[key] = _to_dict(value)
elif (isinstance(value, list)
and hasattr(value[0], 'default_allowed_keys')):
result[key] = [_to_dict(item) for item in value]
else:
result[key] = value
return result
if hasattr(sqlalchemy_object, 'all'):
return _to_dict(sqlalchemy_object.all())
return sqlalchemy_object
def store_results(results):
"""Store test results."""
test = models.Test()
@ -77,7 +121,7 @@ def store_results(results):
test_result.name = result['name']
test_result.uuid = result.get('uuid', None)
test.results.append(test_result)
for k, v in six.iteritems(results.get('metadata', {})):
for k, v in six.iteritems(results.get('meta', {})):
meta = models.TestMeta()
meta.meta_key, meta.value = k, v
test.meta.append(meta)
@ -85,22 +129,79 @@ def store_results(results):
return test_id
def get_test(test_id):
def get_test(test_id, allowed_keys=None):
"""Get test info."""
session = get_session()
test_info = session.query(models.Test).\
filter_by(id=test_id).\
test_info = session.query(models.Test). \
filter_by(id=test_id). \
first()
return test_info
if not test_info:
raise NotFound('Test', 'Test result %s not found' % test_id)
return _to_dict(test_info, allowed_keys)
def delete_test(test_id):
"""Delete test information from the database."""
session = get_session()
with session.begin():
test = session.query(models.Test).filter_by(id=test_id).first()
if test:
session.query(models.TestMeta) \
.filter_by(test_id=test_id).delete()
session.query(models.TestResults) \
.filter_by(test_id=test_id).delete()
session.delete(test)
else:
raise NotFound('Test', 'Test result %s not found' % test_id)
def get_test_meta_key(test_id, key, default=None):
"""Get metadata value related to specified test run."""
session = get_session()
meta_item = session.query(models.TestMeta). \
filter_by(test_id=test_id). \
filter_by(meta_key=key). \
first()
value = meta_item.value if meta_item else default
return value
def save_test_meta_item(test_id, key, value):
"""Store or update item value related to specified test run."""
session = get_session()
meta_item = (session.query(models.TestMeta)
.filter_by(test_id=test_id)
.filter_by(meta_key=key).first() or models.TestMeta())
meta_item.test_id = test_id
meta_item.meta_key = key
meta_item.value = value
with session.begin():
meta_item.save(session)
def delete_test_meta_item(test_id, key):
"""Delete metadata item related to specified test run."""
session = get_session()
meta_item = session.query(models.TestMeta). \
filter_by(test_id=test_id). \
filter_by(meta_key=key). \
first()
if meta_item:
with session.begin():
session.delete(meta_item)
else:
raise NotFound('TestMeta',
'Metadata key %s '
'not found for test run %s' % (key, test_id))
def get_test_results(test_id):
"""Get test results."""
session = get_session()
results = session.query(models.TestResults.name).\
filter_by(test_id=test_id).\
results = session.query(models.TestResults). \
filter_by(test_id=test_id). \
all()
return results
return [_to_dict(result) for result in results]
def _apply_filters_for_query(query, filters):
@ -122,29 +223,30 @@ def _apply_filters_for_query(query, filters):
query = (query
.join(models.Test.meta)
.filter(models.TestMeta.meta_key == api_const.PUBLIC_KEY)
.filter(models.TestMeta.value.in_(filters['pubkeys']))
.filter(models.TestMeta.value.in_(
filters[api_const.USER_PUBKEYS]))
)
else:
signed_results = (query.session
.query(models.TestMeta.test_id)
.filter_by(meta_key=api_const.PUBLIC_KEY))
query = query.filter(models.Test.id.notin_(signed_results))
shared_results = (query.session
.query(models.TestMeta.test_id)
.filter_by(meta_key=api_const.SHARED_TEST_RUN))
query = (query.filter(models.Test.id.notin_(signed_results))
.union(query.filter(models.Test.id.in_(shared_results))))
return query
def get_test_records(page, per_page, filters):
"""Get page with list of test records."""
session = get_session()
query = session.query(models.Test.id,
models.Test.created_at,
models.Test.cpid)
query = session.query(models.Test)
query = _apply_filters_for_query(query, filters)
results = query.order_by(models.Test.created_at.desc()).\
offset(per_page * (page - 1)).\
limit(per_page)
return results
results = query.order_by(models.Test.created_at.desc()). \
offset(per_page * (page - 1)). \
limit(per_page).all()
return _to_dict(results)
def get_test_records_count(filters):
@ -156,19 +258,12 @@ def get_test_records_count(filters):
return records_count
class UserNotFound(Exception):
"""Raise if user not found."""
pass
def user_get(user_openid):
"""Get user info by openid."""
session = get_session()
user = session.query(models.User).filter_by(openid=user_openid).first()
if user is None:
raise UserNotFound('User with OpenID %s not found' % user_openid)
raise NotFound('User', 'User with OpenID %s not found' % user_openid)
return user
@ -176,7 +271,7 @@ def user_save(user_info):
"""Create user DB record if it exists, otherwise record will be updated."""
try:
user = user_get(user_info['openid'])
except UserNotFound:
except NotFound:
user = models.User()
session = get_session()
@ -191,10 +286,10 @@ def store_pubkey(pubkey_info):
pubkey = models.PubKey()
pubkey.openid = pubkey_info['openid']
pubkey.format = pubkey_info['format']
pubkey.pubkey = pubkey_info['key']
pubkey.pubkey = pubkey_info['pubkey']
pubkey.md5_hash = hashlib.md5(
base64.b64decode(
pubkey_info['key'].encode('ascii')
pubkey_info['pubkey'].encode('ascii')
)
).hexdigest()
pubkey.comment = pubkey_info['comment']
@ -207,8 +302,8 @@ def store_pubkey(pubkey_info):
if not pubkeys_collision:
pubkey.save(session)
else:
raise DBDuplicateEntry(columns=['pubkeys.pubkey'],
value=pubkey.pubkey)
raise oslo_db_exc.DBDuplicateEntry(columns=['pubkeys.pubkey'],
value=pubkey.pubkey)
return pubkey.id
@ -224,12 +319,4 @@ def get_user_pubkeys(user_openid):
"""Get public pubkeys for specified user."""
session = get_session()
pubkeys = session.query(models.PubKey).filter_by(openid=user_openid).all()
result = []
for pubkey in pubkeys:
result.append({
'id': pubkey.id,
'format': pubkey.format,
'key': pubkey.pubkey,
'comment': pubkey.comment
})
return result
return _to_dict(pubkeys)

View File

@ -36,10 +36,21 @@ class RefStackBase(models.ModelBase,
"""Base class for RefStack Models."""
__table_args__ = {'mysql_engine': 'InnoDB'}
@property
def metadata_keys(self): # pragma: no cover
"""Model keys with metadata structure. Will be converted in dict."""
return dict()
@property
def default_allowed_keys(self): # pragma: no cover
"""Default keys will be present in resulted dict."""
return ()
metadata = None
class Test(BASE, RefStackBase):
class Test(BASE, RefStackBase): # pragma: no cover
"""Test."""
@ -51,8 +62,24 @@ class Test(BASE, RefStackBase):
results = orm.relationship('TestResults', backref='test')
meta = orm.relationship('TestMeta', backref='test')
@property
def _extra_keys(self):
"""Relation should be pointed directly."""
return ['results', 'meta']
class TestResults(BASE, RefStackBase):
@property
def metadata_keys(self):
"""Model keys with metadata structure."""
return {'meta': {'key': 'meta_key',
'value': 'value'}}
@property
def default_allowed_keys(self):
"""Default keys."""
return 'id', 'created_at', 'duration_seconds', 'meta'
class TestResults(BASE, RefStackBase): # pragma: no cover
"""Test results."""
@ -70,8 +97,13 @@ class TestResults(BASE, RefStackBase):
name = sa.Column(sa.String(512, collation='latin1_swedish_ci'),)
uuid = sa.Column(sa.String(36))
@property
def default_allowed_keys(self):
"""Default keys."""
return 'name', 'uuid'
class TestMeta(BASE, RefStackBase):
class TestMeta(BASE, RefStackBase): # pragma: no cover
"""Test metadata."""
@ -85,8 +117,13 @@ class TestMeta(BASE, RefStackBase):
meta_key = sa.Column(sa.String(64), index=True, nullable=False)
value = sa.Column(sa.Text())
@property
def default_allowed_keys(self):
"""Default keys."""
return 'meta_key', 'value'
class User(BASE, RefStackBase):
class User(BASE, RefStackBase): # pragma: no cover
"""User information."""
@ -98,8 +135,18 @@ class User(BASE, RefStackBase):
fullname = sa.Column(sa.String(128))
pubkeys = orm.relationship('PubKey', backref='user')
@property
def _extra_keys(self):
"""Relation should be pointed directly."""
return ['pubkeys']
class PubKey(BASE, RefStackBase):
@property
def default_allowed_keys(self):
"""Default keys."""
return 'openid', 'email', 'fullname', 'pubkeys'
class PubKey(BASE, RefStackBase): # pragma: no cover
"""User public pubkeys."""
@ -113,3 +160,8 @@ class PubKey(BASE, RefStackBase):
pubkey = sa.Column(sa.Text(), nullable=False)
comment = sa.Column(sa.String(128))
md5_hash = sa.Column(sa.String(32), nullable=False, index=True)
@property
def default_allowed_keys(self):
"""Default keys."""
return 'id', 'openid', 'format', 'pubkey', 'comment'

View File

@ -13,3 +13,17 @@
# License for the specific language governing permissions and limitations
# under the License.
"""Refstack unittests."""
import mock
from oslotest import base
class RefstackBaseTestCase(base.BaseTestCase):
"""Refstack test base class."""
def setup_mock(self, *args, **kwargs):
"""Mock in test setup."""
patcher = mock.patch(*args, **kwargs)
self.addCleanup(patcher.stop)
return patcher.start()

View File

@ -21,7 +21,6 @@ import sys
import httmock
import mock
from oslo_config import fixture as config_fixture
from oslotest import base
import requests
from six.moves.urllib import parse
import webob.exc
@ -33,6 +32,7 @@ from refstack.api.controllers import capabilities
from refstack.api.controllers import results
from refstack.api.controllers import validation
from refstack.api.controllers import user
from refstack.tests import unit as base
def safe_json_dump(content):
@ -44,7 +44,23 @@ def safe_json_dump(content):
return content
class RootControllerTestCase(base.BaseTestCase):
class BaseControllerTestCase(base.RefstackBaseTestCase):
def setUp(self):
super(BaseControllerTestCase, self).setUp()
self.mock_request = self.setup_mock('pecan.request')
self.mock_response = self.setup_mock('pecan.response')
self.mock_abort = \
self.setup_mock('pecan.abort',
side_effect=webob.exc.HTTPError)
self.mock_get_user_role = \
self.setup_mock('refstack.api.utils.get_user_role')
self.mock_is_authenticated = \
self.setup_mock('refstack.api.utils.is_authenticated',
return_value=True)
class RootControllerTestCase(BaseControllerTestCase):
@mock.patch('pecan.expose', return_value=lambda f: f)
def test_index(self, expose_mock):
@ -58,7 +74,7 @@ class RootControllerTestCase(base.BaseTestCase):
expose_mock.assert_called_with(generic=True, template='index.html')
class ResultsControllerTestCase(base.BaseTestCase):
class ResultsControllerTestCase(BaseControllerTestCase):
def setUp(self):
super(ResultsControllerTestCase, self).setUp()
@ -78,33 +94,58 @@ class ResultsControllerTestCase(base.BaseTestCase):
@mock.patch('refstack.db.get_test')
@mock.patch('refstack.db.get_test_results')
def test_get(self, mock_get_test_res, mock_get_test):
test_info = mock.Mock()
test_info.cpid = 'foo'
test_info.created_at = 'bar'
test_info.duration_seconds = 999
self.mock_get_user_role.return_value = const.ROLE_USER
test_info = {'created_at': 'bar',
'duration_seconds': 999}
mock_get_test.return_value = test_info
mock_get_test_res.return_value = [('test1',), ('test2',), ('test3',)]
mock_get_test_res.return_value = [{'name': 'test1'},
{'name': 'test2'}]
actual_result = self.controller.get_one('fake_arg')
expected_result = {
'cpid': 'foo',
'created_at': 'bar',
'duration_seconds': 999,
'results': ['test1', 'test2', 'test3']
'results': ['test1', 'test2'],
'user_role': const.ROLE_USER
}
self.assertEqual(actual_result, expected_result)
mock_get_test_res.assert_called_once_with('fake_arg')
mock_get_test.assert_called_once_with('fake_arg')
@mock.patch('refstack.db.get_test')
@mock.patch('refstack.db.get_test_results')
def test_get_for_owner(self, mock_get_test_res, mock_get_test):
self.mock_get_user_role.return_value = const.ROLE_OWNER
test_info = {'cpid': 'foo',
'created_at': 'bar',
'duration_seconds': 999}
mock_get_test.return_value = test_info
mock_get_test_res.return_value = [{'name': 'test1'},
{'name': 'test2'}]
actual_result = self.controller.get_one('fake_arg')
expected_result = {
'cpid': 'foo',
'created_at': 'bar',
'duration_seconds': 999,
'results': ['test1', 'test2'],
'user_role': const.ROLE_OWNER
}
self.assertEqual(actual_result, expected_result)
mock_get_test_res.assert_called_once_with('fake_arg')
mock_get_test.assert_called_once_with(
'fake_arg', allowed_keys=['id', 'cpid', 'created_at',
'duration_seconds', 'meta']
)
@mock.patch('refstack.db.store_results')
@mock.patch('pecan.response')
@mock.patch('pecan.request')
def test_post(self, mock_request, mock_response, mock_store_results):
mock_request.body = '{"answer": 42}'
mock_request.headers = {}
def test_post(self, mock_store_results):
self.mock_request.body = '{"answer": 42}'
self.mock_request.headers = {}
mock_store_results.return_value = 'fake_test_id'
result = self.controller.post()
self.assertEqual(
@ -113,17 +154,13 @@ class ResultsControllerTestCase(base.BaseTestCase):
'url': parse.urljoin(self.ui_url,
self.test_results_url) % 'fake_test_id'}
)
self.assertEqual(mock_response.status, 201)
self.assertEqual(self.mock_response.status, 201)
mock_store_results.assert_called_once_with({'answer': 42})
@mock.patch('refstack.db.store_results')
@mock.patch('pecan.response')
@mock.patch('pecan.request')
def test_post_with_sign(self, mock_request,
mock_response,
mock_store_results):
mock_request.body = '{"answer": 42}'
mock_request.headers = {
def test_post_with_sign(self, mock_store_results):
self.mock_request.body = '{"answer": 42}'
self.mock_request.headers = {
'X-Signature': 'fake-sign',
'X-Public-Key': 'fake-key'
}
@ -132,65 +169,51 @@ class ResultsControllerTestCase(base.BaseTestCase):
self.assertEqual(result,
{'test_id': 'fake_test_id',
'url': self.test_results_url % 'fake_test_id'})
self.assertEqual(mock_response.status, 201)
self.assertEqual(self.mock_response.status, 201)
mock_store_results.assert_called_once_with(
{'answer': 42, 'metadata': {const.PUBLIC_KEY: 'fake-key'}}
{'answer': 42, 'meta': {const.PUBLIC_KEY: 'fake-key'}}
)
@mock.patch('pecan.abort')
@mock.patch('refstack.db.get_test')
def test_get_item_failed(self, mock_get_test, mock_abort):
def test_get_item_failed(self, mock_get_test):
mock_get_test.return_value = None
mock_abort.side_effect = Exception()
self.assertRaises(Exception,
self.controller.get_item,
self.assertRaises(webob.exc.HTTPError,
self.controller.get_one,
'fake_id')
@mock.patch('pecan.abort')
@mock.patch('refstack.api.utils.parse_input_params')
def test_get_failed_in_parse_input_params(self,
parse_inputs,
pecan_abort):
def test_get_failed_in_parse_input_params(self, parse_inputs):
parse_inputs.side_effect = api_utils.ParseInputsError()
pecan_abort.side_effect = Exception()
self.assertRaises(Exception,
self.assertRaises(webob.exc.HTTPError,
self.controller.get)
@mock.patch('refstack.db.get_test_records_count')
@mock.patch('pecan.abort')
@mock.patch('refstack.api.utils.parse_input_params')
def test_get_failed_in_get_test_records_number(self,
parse_inputs,
pecan_abort,
db_get_count):
db_get_count.side_effect = Exception()
pecan_abort.side_effect = Exception()
self.assertRaises(Exception,
db_get_count.side_effect = api_utils.ParseInputsError()
self.assertRaises(webob.exc.HTTPError,
self.controller.get)
@mock.patch('refstack.db.get_test_records_count')
@mock.patch('refstack.api.utils.parse_input_params')
@mock.patch('refstack.api.utils.get_page_number')
@mock.patch('pecan.abort')
def test_get_failed_in_get_page_number(self,
pecan_abort,
get_page,
parse_input,
db_get_count):
get_page.side_effect = api_utils.ParseInputsError()
pecan_abort.side_effect = Exception()
self.assertRaises(Exception,
self.assertRaises(webob.exc.HTTPError,
self.controller.get)
@mock.patch('refstack.db.get_test_records')
@mock.patch('refstack.db.get_test_records_count')
@mock.patch('refstack.api.utils.parse_input_params')
@mock.patch('refstack.api.utils.get_page_number')
@mock.patch('pecan.abort')
def test_get_failed_in_get_test_records(self,
pecan_abort,
get_page,
parce_input,
db_get_count,
@ -198,8 +221,7 @@ class ResultsControllerTestCase(base.BaseTestCase):
get_page.return_value = (mock.Mock(), mock.Mock())
db_get_test.side_effect = Exception()
pecan_abort.side_effect = Exception()
self.assertRaises(Exception,
self.assertRaises(webob.exc.HTTPError,
self.controller.get)
@mock.patch('refstack.db.get_test_records')
@ -228,19 +250,13 @@ class ResultsControllerTestCase(base.BaseTestCase):
per_page,
'api')
record = mock.Mock()
record.id = 111
record.created_at = '12345'
record.cpid = '54321'
record = {'id': 111, 'created_at': '12345', 'cpid': '54321'}
expected_record = record.copy()
expected_record['url'] = self.test_results_url % record['id']
db_get_test.return_value = [record]
expected_result = {
'results': [{
'test_id': record.id,
'created_at': record.created_at,
'cpid': record.cpid,
'url': self.test_results_url % record.id
}],
'results': [expected_record],
'pagination': {
'current_page': page_number,
'total_pages': total_pages_number
@ -258,12 +274,23 @@ class ResultsControllerTestCase(base.BaseTestCase):
db_get_test.assert_called_once_with(page_number, per_page, filters)
@mock.patch('refstack.db.delete_test')
def test_delete(self, mock_db_delete):
self.mock_get_user_role.return_value = const.ROLE_OWNER
self.controller.delete('test_id')
self.assertEqual(204, self.mock_response.status)
self.mock_get_user_role.return_value = const.ROLE_USER
self.mock_abort.side_effect = webob.exc.HTTPError()
self.assertRaises(webob.exc.HTTPError,
self.controller.delete, 'test_id')
class CapabilitiesControllerTestCase(base.BaseTestCase):
class CapabilitiesControllerTestCase(BaseControllerTestCase):
def setUp(self):
super(CapabilitiesControllerTestCase, self).setUp()
self.controller = capabilities.CapabilitiesController()
self.mock_abort.side_effect = None
def test_get_capabilities(self):
"""Test when getting a list of all capability files."""
@ -280,8 +307,7 @@ class CapabilitiesControllerTestCase(base.BaseTestCase):
result = self.controller.get()
self.assertEqual(['2015.03.json'], result)
@mock.patch('pecan.abort')
def test_get_capabilities_error_code(self, mock_abort):
def test_get_capabilities_error_code(self):
"""Test when the HTTP status code isn't a 200 OK. The status should
be propogated."""
@httmock.all_requests
@ -291,15 +317,14 @@ class CapabilitiesControllerTestCase(base.BaseTestCase):
with httmock.HTTMock(github_api_mock):
self.controller.get()
mock_abort.assert_called_with(404)
self.mock_abort.assert_called_with(404)
@mock.patch('requests.get')
@mock.patch('pecan.abort')
def test_get_capabilities_exception(self, mock_abort, mock_request):
def test_get_capabilities_exception(self, mock_requests_get):
"""Test when the GET request raises an exception."""
mock_request.side_effect = requests.exceptions.RequestException()
mock_requests_get.side_effect = requests.exceptions.RequestException()
self.controller.get()
mock_abort.assert_called_with(500)
self.mock_abort.assert_called_with(500)
def test_get_capability_file(self):
"""Test when getting a specific capability file"""
@ -312,8 +337,7 @@ class CapabilitiesControllerTestCase(base.BaseTestCase):
result = self.controller.get_one('2015.03')
self.assertEqual({'foo': 'bar'}, result)
@mock.patch('pecan.abort')
def test_get_capability_file_error_code(self, mock_abort):
def test_get_capability_file_error_code(self):
"""Test when the HTTP status code isn't a 200 OK. The status should
be propogated."""
@httmock.all_requests
@ -323,18 +347,17 @@ class CapabilitiesControllerTestCase(base.BaseTestCase):
with httmock.HTTMock(github_api_mock):
self.controller.get_one('2010.03')
mock_abort.assert_called_with(404)
self.mock_abort.assert_called_with(404)
@mock.patch('requests.get')
@mock.patch('pecan.abort')
def test_get_capability_file_exception(self, mock_abort, mock_request):
def test_get_capability_file_exception(self, mock_requests_get):
"""Test when the GET request raises an exception."""
mock_request.side_effect = requests.exceptions.RequestException()
mock_requests_get.side_effect = requests.exceptions.RequestException()
self.controller.get_one('2010.03')
mock_abort.assert_called_with(500)
self.mock_abort.assert_called_with(500)
class BaseRestControllerWithValidationTestCase(base.BaseTestCase):
class BaseRestControllerWithValidationTestCase(BaseControllerTestCase):
def setUp(self):
super(BaseRestControllerWithValidationTestCase, self).setUp()
@ -355,15 +378,6 @@ class BaseRestControllerWithValidationTestCase(base.BaseTestCase):
self.assertEqual(mock_response.status, 201)
self.controller.store_item.assert_called_once_with([42])
def test_get_one_return_item(self):
self.validator.assert_id = mock.Mock(return_value=True)
self.controller.get_item = mock.Mock(return_value='fake_item')
result = self.controller.get_one('fake_arg')
self.assertEqual(result, 'fake_item')
self.controller.get_item.assert_called_once_with(item_id='fake_arg')
def test_get_one_return_schema(self):
self.validator.assert_id = mock.Mock(return_value=False)
self.validator.schema = 'fake_schema'
@ -371,7 +385,7 @@ class BaseRestControllerWithValidationTestCase(base.BaseTestCase):
self.assertEqual(result, 'fake_schema')
class ProfileControllerTestCase(base.BaseTestCase):
class ProfileControllerTestCase(BaseControllerTestCase):
def setUp(self):
super(ProfileControllerTestCase, self).setUp()
@ -383,16 +397,14 @@ class ProfileControllerTestCase(base.BaseTestCase):
fullname='Dobby'))
@mock.patch('refstack.api.utils.get_user_session',
return_value={const.USER_OPENID: 'foo@bar.org'})
@mock.patch('refstack.api.utils.is_authenticated', return_value=True)
def test_get(self, mock_is_authenticated, mock_get_user_session,
mock_user_get):
def test_get(self, mock_get_user_session, mock_user_get):
actual_result = self.controller.get()
self.assertEqual({'openid': 'foo@bar.org',
'email': 'foo@bar.org',
'fullname': 'Dobby'}, actual_result)
class AuthControllerTestCase(base.BaseTestCase):
class AuthControllerTestCase(BaseControllerTestCase):
def setUp(self):
super(AuthControllerTestCase, self).setUp()
@ -403,20 +415,17 @@ class AuthControllerTestCase(base.BaseTestCase):
self.CONF.set_override('ui_url', '127.0.0.1')
@mock.patch('refstack.api.utils.get_user_session')
@mock.patch('refstack.api.utils.is_authenticated', return_value=True)
@mock.patch('pecan.redirect', side_effect=webob.exc.HTTPRedirection)
def test_signed_signin(self, mock_redirect, mock_is_authenticated,
mock_get_user_session):
def test_signed_signin(self, mock_redirect, mock_get_user_session):
mock_session = mock.MagicMock(**{const.USER_OPENID: 'foo@bar.org'})
mock_get_user_session.return_value = mock_session
self.assertRaises(webob.exc.HTTPRedirection, self.controller.signin)
mock_redirect.assert_called_with('127.0.0.1')
@mock.patch('refstack.api.utils.get_user_session')
@mock.patch('refstack.api.utils.is_authenticated', return_value=False)
@mock.patch('pecan.redirect', side_effect=webob.exc.HTTPRedirection)
def test_unsigned_signin(self, mock_redirect, mock_is_authenticated,
mock_get_user_session):
def test_unsigned_signin(self, mock_redirect, mock_get_user_session):
self.mock_is_authenticated.return_value = False
mock_session = mock.MagicMock(**{const.USER_OPENID: 'foo@bar.org'})
mock_get_user_session.return_value = mock_session
self.assertRaises(webob.exc.HTTPRedirection, self.controller.signin)
@ -424,93 +433,88 @@ class AuthControllerTestCase(base.BaseTestCase):
mock_redirect.call_args[1]['location'])
@mock.patch('socket.gethostbyname', return_value='1.1.1.1')
@mock.patch('pecan.request')
@mock.patch('refstack.api.utils.get_user_session')
@mock.patch('pecan.abort', side_effect=webob.exc.HTTPError)
def test_signin_return_failed(self, mock_abort, mock_get_user_session,
mock_request, mock_socket):
def test_signin_return_failed(self, mock_get_user_session, mock_socket):
self.mock_abort.side_effect = webob.exc.HTTPError
mock_session = mock.MagicMock(**{const.USER_OPENID: 'foo@bar.org',
const.CSRF_TOKEN: '42'})
mock_get_user_session.return_value = mock_session
mock_request.remote_addr = '1.1.1.2'
self.mock_request.remote_addr = '1.1.1.2'
mock_request.GET = {
self.mock_request.GET = {
const.OPENID_ERROR: 'foo is not bar!!!'
}
mock_request.environ['beaker.session'] = {
self.mock_request.environ['beaker.session'] = {
const.CSRF_TOKEN: 42
}
self.assertRaises(webob.exc.HTTPError, self.controller.signin_return)
mock_abort.assert_called_once_with(
401, mock_request.GET[const.OPENID_ERROR])
self.mock_abort.assert_called_once_with(
401, self.mock_request.GET[const.OPENID_ERROR])
self.assertNotIn(const.CSRF_TOKEN,
mock_request.environ['beaker.session'])
self.mock_request.environ['beaker.session'])
mock_abort.reset_mock()
mock_request.environ['beaker.session'] = {
self.mock_abort.reset_mock()
self.mock_request.environ['beaker.session'] = {
const.CSRF_TOKEN: 42
}
mock_request.GET = {
self.mock_request.GET = {
const.OPENID_MODE: 'cancel'
}
self.assertRaises(webob.exc.HTTPError, self.controller.signin_return)
mock_abort.assert_called_once_with(
self.mock_abort.assert_called_once_with(
401, 'Authentication canceled.')
self.assertNotIn(const.CSRF_TOKEN,
mock_request.environ['beaker.session'])
self.mock_request.environ['beaker.session'])
mock_abort.reset_mock()
mock_request.environ['beaker.session'] = {
self.mock_abort.reset_mock()
self.mock_request.environ['beaker.session'] = {
const.CSRF_TOKEN: 42
}
mock_request.GET = {}
self.mock_request.GET = {}
self.assertRaises(webob.exc.HTTPError, self.controller.signin_return)
mock_abort.assert_called_once_with(
self.mock_abort.assert_called_once_with(
401, 'Authentication is failed. Try again.')
self.assertNotIn(const.CSRF_TOKEN,
mock_request.environ['beaker.session'])
self.mock_request.environ['beaker.session'])
mock_abort.reset_mock()
mock_request.environ['beaker.session'] = {
self.mock_abort.reset_mock()
self.mock_request.environ['beaker.session'] = {
const.CSRF_TOKEN: 42
}
mock_request.GET = {const.CSRF_TOKEN: '24'}
mock_request.remote_addr = '1.1.1.1'
self.mock_request.GET = {const.CSRF_TOKEN: '24'}
self.mock_request.remote_addr = '1.1.1.1'
self.assertRaises(webob.exc.HTTPError, self.controller.signin_return)
mock_abort.assert_called_once_with(
self.mock_abort.assert_called_once_with(
401, 'Authentication is failed. Try again.')
self.assertNotIn(const.CSRF_TOKEN,
mock_request.environ['beaker.session'])
self.mock_request.environ['beaker.session'])
@mock.patch('refstack.api.utils.verify_openid_request', return_value=True)
@mock.patch('refstack.db.user_save')
@mock.patch('pecan.request')
@mock.patch('refstack.api.utils.get_user_session')
@mock.patch('pecan.redirect', side_effect=webob.exc.HTTPRedirection)
def test_signin_return_success(self, mock_redirect, mock_get_user_session,
mock_request, mock_user, mock_verify):
mock_user, mock_verify):
mock_session = mock.MagicMock(**{const.USER_OPENID: 'foo@bar.org',
const.CSRF_TOKEN: 42})
mock_session.get = mock.Mock(return_value=42)
mock_get_user_session.return_value = mock_session
mock_request.GET = {
self.mock_request.GET = {
const.OPENID_CLAIMED_ID: 'foo@bar.org',
const.OPENID_NS_SREG_EMAIL: 'foo@bar.org',
const.OPENID_NS_SREG_FULLNAME: 'foo',
const.CSRF_TOKEN: 42
}
mock_request.environ['beaker.session'] = {
self.mock_request.environ['beaker.session'] = {
const.CSRF_TOKEN: 42
}
self.assertRaises(webob.exc.HTTPRedirection,
self.controller.signin_return)
@mock.patch('pecan.request')
@mock.patch('refstack.api.utils.is_authenticated', return_value=True)
@mock.patch('pecan.redirect', side_effect=webob.exc.HTTPRedirection)
def test_signout(self, mock_redirect, mock_is_authenticated,
mock_request):
def test_signout(self, mock_redirect, mock_request):
mock_request.environ['beaker.session'] = {
const.CSRF_TOKEN: 42
}
@ -518,3 +522,103 @@ class AuthControllerTestCase(base.BaseTestCase):
mock_redirect.assert_called_with('127.0.0.1')
self.assertNotIn(const.CSRF_TOKEN,
mock_request.environ['beaker.session'])
class MetadataControllerTestCase(BaseControllerTestCase):
def setUp(self):
super(MetadataControllerTestCase, self).setUp()
self.controller = results.MetadataController()
@mock.patch('refstack.db.get_test')
def test_get(self, mock_db_get_test):
self.mock_get_user_role.return_value = const.ROLE_USER
mock_db_get_test.return_value = {'meta': 'fake_meta'}
self.assertEqual('fake_meta', self.controller.get('test_id'))
mock_db_get_test.assert_called_once_with('test_id')
@mock.patch('refstack.db.get_test_meta_key')
def test_get_one(self, mock_db_get_test_meta_key):
self.mock_get_user_role.return_value = const.ROLE_USER
mock_db_get_test_meta_key.return_value = 42
self.assertEqual(42, self.controller.get_one('test_id', 'answer'))
mock_db_get_test_meta_key.assert_called_once_with('test_id', 'answer')
@mock.patch('refstack.db.save_test_meta_item')
def test_post(self, mock_save_test_meta_item):
self.mock_get_user_role.return_value = const.ROLE_OWNER
self.controller.post('test_id', 'answer')
self.assertEqual(201, self.mock_response.status)
mock_save_test_meta_item.assert_called_once_with(
'test_id', 'answer', self.mock_request.body)
self.mock_get_user_role.return_value = const.ROLE_USER
self.mock_abort.side_effect = webob.exc.HTTPError()
self.assertRaises(webob.exc.HTTPError,
self.controller.post, 'test_id', 'answer')
@mock.patch('refstack.db.delete_test_meta_item')
def test_delete(self, mock_delete_test_meta_item):
self.mock_get_user_role.return_value = const.ROLE_OWNER
self.controller.delete('test_id', 'answer')
self.assertEqual(204, self.mock_response.status)
mock_delete_test_meta_item.assert_called_once_with('test_id', 'answer')
self.mock_get_user_role.return_value = const.ROLE_USER
self.mock_abort.side_effect = webob.exc.HTTPError()
self.assertRaises(webob.exc.HTTPError,
self.controller.delete, 'test_id', 'answer')
class PublicKeysControllerTestCase(BaseControllerTestCase):
def setUp(self):
super(PublicKeysControllerTestCase, self).setUp()
self.controller = user.PublicKeysController()
@mock.patch('refstack.api.utils.get_user_public_keys')
def test_get(self, mock_get_user_public_keys):
mock_get_user_public_keys.return_value = 42
self.assertEqual(42, self.controller.get())
mock_get_user_public_keys.assert_called_once_with()
@mock.patch('refstack.api.utils.get_user_id')
@mock.patch('refstack.db.store_pubkey')
def test_post(self, mock_store_pubkey, mock_get_user_id):
self.controller.validator.validate = mock.Mock()
mock_get_user_id.return_value = 'fake_id'
mock_store_pubkey.return_value = 42
raw_key = 'fake key Don\'t_Panic.'
fake_pubkey = {
'format': 'fake',
'pubkey': 'key',
'comment': 'Don\'t_Panic.',
'openid': 'fake_id'
}
self.mock_request.body = json.dumps({'raw_key': raw_key})
self.controller.post()
self.assertEqual(201, self.mock_response.status)
mock_store_pubkey.assert_called_once_with(fake_pubkey)
mock_store_pubkey.reset_mock()
raw_key = 'fake key'
fake_pubkey = {
'format': 'fake',
'pubkey': 'key',
'comment': '',
'openid': 'fake_id'
}
self.mock_request.body = json.dumps({'raw_key': raw_key})
self.controller.post()
mock_store_pubkey.assert_called_once_with(fake_pubkey)
@mock.patch('refstack.db.delete_pubkey')
@mock.patch('refstack.api.utils.get_user_public_keys')
def test_delete(self, mock_get_user_public_keys, mock_delete_pubkey):
mock_get_user_public_keys.return_value = ({'id': 'key_id'},)
self.controller.delete('key_id')
self.assertEqual(204, self.mock_response.status)
mock_delete_pubkey.assert_called_once_with('key_id')
self.assertRaises(webob.exc.HTTPError,
self.controller.delete, 'other_key_id')

View File

@ -19,7 +19,9 @@ import mock
from oslo_config import fixture as config_fixture
from oslo_utils import timeutils
from oslotest import base
from pecan import rest
from six.moves.urllib import parse
from webob import exc
from refstack.api import constants as const
from refstack.api import utils as api_utils
@ -118,7 +120,9 @@ class APIUtilsTestCase(base.BaseTestCase):
expected_params)
@mock.patch.object(api_utils, '_get_input_params_from_request')
def test_parse_input_params_success(self, mock_get_input):
@mock.patch.object(api_utils, 'is_authenticated', return_value=False)
def test_parse_input_params_failed_in_auth(self, mock_is_authenticated,
mock_get_input):
fmt = '%Y-%m-%d %H:%M:%S'
self.CONF.set_override('input_date_format',
fmt,
@ -127,8 +131,34 @@ class APIUtilsTestCase(base.BaseTestCase):
const.START_DATE: '2015-03-26 15:04:40',
const.END_DATE: '2015-03-26 15:04:50',
const.CPID: '12345',
const.SIGNED: True
}
expected_params = mock.Mock()
mock_get_input.return_value = raw_filters
self.assertRaises(api_utils.ParseInputsError,
api_utils.parse_input_params, expected_params)
@mock.patch.object(api_utils, '_get_input_params_from_request')
@mock.patch.object(api_utils, 'is_authenticated', return_value=True)
@mock.patch.object(api_utils, 'get_user_id', return_value='fake_id')
@mock.patch('refstack.db.get_user_pubkeys')
def test_parse_input_params_success(self, mock_get_user_pubkeys,
mock_get_user_id,
mock_is_authenticated,
mock_get_input):
fmt = '%Y-%m-%d %H:%M:%S'
self.CONF.set_override('input_date_format',
fmt,
'api')
raw_filters = {
const.START_DATE: '2015-03-26 15:04:40',
const.END_DATE: '2015-03-26 15:04:50',
const.CPID: '12345',
const.SIGNED: True
}
fake_pubkeys = ({'format': 'fake',
'pubkey': 'fake_pk'},)
mock_get_user_pubkeys.return_value = fake_pubkeys
expected_params = mock.Mock()
mock_get_input.return_value = raw_filters
@ -145,7 +175,10 @@ class APIUtilsTestCase(base.BaseTestCase):
expected_result = {
const.START_DATE: parsed_start_date,
const.END_DATE: parsed_end_date,
const.CPID: '12345'
const.CPID: '12345',
const.SIGNED: True,
const.OPENID: 'fake_id',
const.USER_PUBKEYS: ['fake fake_pk'],
}
result = api_utils.parse_input_params(expected_params)
@ -292,10 +325,133 @@ class APIUtilsTestCase(base.BaseTestCase):
mock_get_user.return_value = 'Dobby'
self.assertEqual(True, api_utils.is_authenticated())
mock_db.user_get.called_once_with(mock_session)
mock_db.UserNotFound = db.UserNotFound
mock_get_user.side_effect = mock_db.UserNotFound
mock_db.NotFound = db.NotFound
mock_get_user.side_effect = mock_db.NotFound('User')
self.assertEqual(False, api_utils.is_authenticated())
@mock.patch('pecan.abort', side_effect=exc.HTTPError)
@mock.patch('refstack.db.get_test_meta_key')
@mock.patch.object(api_utils, 'is_authenticated')
@mock.patch.object(api_utils, 'get_user_public_keys')
def test_check_get_user_role(self, mock_get_user_public_keys,
mock_is_authenticated,
mock_get_test_meta_key,
mock_pecan_abort):
# Check user level
mock_get_test_meta_key.return_value = None
self.assertEqual(const.ROLE_USER, api_utils.get_user_role('fake_test'))
api_utils.enforce_permissions('fake_test', const.ROLE_USER)
self.assertRaises(exc.HTTPError, api_utils.enforce_permissions,
'fake_test', const.ROLE_OWNER)
mock_get_test_meta_key.side_effect = {
('fake_test', const.PUBLIC_KEY): 'fake key',
('fake_test', const.SHARED_TEST_RUN): 'true',
}.get
self.assertEqual(const.ROLE_USER, api_utils.get_user_role('fake_test'))
api_utils.enforce_permissions('fake_test', const.ROLE_USER)
self.assertRaises(exc.HTTPError, api_utils.enforce_permissions,
'fake_test', const.ROLE_OWNER)
mock_is_authenticated.return_value = True
mock_get_user_public_keys.return_value = [{'format': 'fake',
'pubkey': 'key'}]
mock_get_test_meta_key.side_effect = {
('fake_test', const.PUBLIC_KEY): 'fake key',
('fake_test', const.SHARED_TEST_RUN): 'true',
}.get
self.assertEqual(const.ROLE_USER, api_utils.get_user_role('fake_test'))
api_utils.enforce_permissions('fake_test', const.ROLE_USER)
self.assertRaises(exc.HTTPError, api_utils.enforce_permissions,
'fake_test', const.ROLE_OWNER)
# Check owner level
mock_is_authenticated.return_value = True
mock_get_user_public_keys.return_value = [{'format': 'fake',
'pubkey': 'key'}]
mock_get_test_meta_key.side_effect = lambda *args: {
('fake_test', const.PUBLIC_KEY): 'fake key',
('fake_test', const.SHARED_TEST_RUN): None,
}.get(args)
self.assertEqual(const.ROLE_OWNER,
api_utils.get_user_role('fake_test'))
api_utils.enforce_permissions('fake_test', const.ROLE_USER)
api_utils.enforce_permissions('fake_test', const.ROLE_OWNER)
# Check negative cases
mock_is_authenticated.return_value = False
mock_get_test_meta_key.side_effect = lambda *args: {
('fake_test', const.PUBLIC_KEY): 'fake key',
('fake_test', const.SHARED_TEST_RUN): None,
}.get(args)
self.assertRaises(exc.HTTPError, api_utils.enforce_permissions,
'fake_test', const.ROLE_USER)
self.assertRaises(exc.HTTPError, api_utils.enforce_permissions,
'fake_test', const.ROLE_OWNER)
mock_is_authenticated.return_value = True
mock_get_user_public_keys.return_value = [{'format': 'fake',
'pubkey': 'key'}]
mock_get_test_meta_key.side_effect = lambda *args: {
('fake_test', const.PUBLIC_KEY): 'fake other_key',
('fake_test', const.SHARED_TEST_RUN): None,
}.get(args)
self.assertEqual(None, api_utils.get_user_role('fake_test'))
self.assertRaises(exc.HTTPError, api_utils.enforce_permissions,
'fake_test', const.ROLE_USER)
self.assertRaises(exc.HTTPError, api_utils.enforce_permissions,
'fake_test', const.ROLE_OWNER)
@mock.patch('pecan.abort', side_effect=exc.HTTPError)
@mock.patch('refstack.db.get_test_meta_key')
@mock.patch.object(api_utils, 'is_authenticated')
@mock.patch.object(api_utils, 'get_user_public_keys')
def test_check_permissions(self, mock_get_user_public_keys,
mock_is_authenticated,
mock_get_test_meta_key,
mock_pecan_abort):
@api_utils.check_permissions(level=const.ROLE_USER)
class ControllerWithPermissions(rest.RestController):
def get(self, test_id):
return test_id
@api_utils.check_permissions(level=const.ROLE_OWNER)
def delete(self, test_id):
return test_id
@api_utils.check_permissions(level='fake_role')
def post(self, test_id):
return test_id
fake_controller = ControllerWithPermissions()
public_test = 'fake_public_test'
private_test = 'fake_test'
mock_get_user_public_keys.return_value = [{'format': 'fake',
'pubkey': 'key'}]
mock_get_test_meta_key.side_effect = lambda *args: {
(public_test, const.PUBLIC_KEY): None,
(private_test, const.PUBLIC_KEY): 'fake key',
(private_test, const.SHARED_TEST_RUN): None,
}.get(args)
mock_is_authenticated.return_value = True
self.assertEqual(public_test, fake_controller.get(public_test))
self.assertRaises(exc.HTTPError, fake_controller.delete, public_test)
self.assertEqual(private_test, fake_controller.get(private_test))
self.assertEqual(private_test, fake_controller.delete(private_test))
mock_is_authenticated.return_value = False
self.assertEqual(public_test, fake_controller.get(public_test))
self.assertRaises(exc.HTTPError, fake_controller.delete, public_test)
self.assertRaises(exc.HTTPError, fake_controller.get, private_test)
self.assertRaises(exc.HTTPError, fake_controller.delete, private_test)
self.assertRaises(ValueError, fake_controller.post, public_test)
@mock.patch('requests.post')
@mock.patch('pecan.abort')
def test_verify_openid_request(self, mock_abort, mock_post):

View File

@ -15,14 +15,19 @@
"""Tests for database."""
import base64
import hashlib
import six
import mock
from oslo_config import fixture as config_fixture
from oslo_db import exception as oslo_db_exc
from oslotest import base
import sqlalchemy.orm
from refstack import db
from refstack.api import constants as api_const
from refstack.db.sqlalchemy import api
from refstack.db.sqlalchemy import models
class DBAPITestCase(base.BaseTestCase):
@ -36,7 +41,7 @@ class DBAPITestCase(base.BaseTestCase):
@mock.patch.object(api, 'get_test')
def test_get_test(self, mock_get_test):
db.get_test(12345)
mock_get_test.assert_called_once_with(12345)
mock_get_test.assert_called_once_with(12345, allowed_keys=None)
@mock.patch.object(api, 'get_test_results')
def test_get_test_results(self, mock_get_test_results):
@ -108,6 +113,43 @@ class DBBackendTestCase(base.BaseTestCase):
self.config_fixture = config_fixture.Config()
self.CONF = self.useFixture(self.config_fixture).conf
def test_to_dict(self):
fake_query_result = mock.Mock()
fake_query_result.keys.return_value = ('fake_id',)
fake_query_result.index = 1
fake_query_result.fake_id = 12345
self.assertEqual({'fake_id': 12345}, api._to_dict(fake_query_result))
fake_query_result_list = [fake_query_result]
self.assertEqual([{'fake_id': 12345}],
api._to_dict(fake_query_result_list))
fake_query = mock.Mock(spec=sqlalchemy.orm.Query)
fake_query.all.return_value = fake_query_result
self.assertEqual({'fake_id': 12345}, api._to_dict(fake_query))
fake_model = mock.Mock(spec=models.RefStackBase)
fake_model.default_allowed_keys = ('fake_id', 'meta',
'child', 'childs')
fake_child = mock.Mock(spec=models.RefStackBase)
fake_child.iteritems.return_value = {'child_id': 42}.items()
fake_child.default_allowed_keys = ('child_id',)
fake_child.metadata_keys = {}
actuall_dict = {'fake_id': 12345,
'meta': [{'meta_key': 'answer',
'value': 42}],
'child': fake_child,
'childs': [fake_child]}
fake_model.iteritems.return_value = actuall_dict.items()
fake_model.metadata_keys = {'meta': {'key': 'meta_key',
'value': 'value'}}
self.assertEqual({'fake_id': 12345,
'meta': {'answer': 42},
'child': {'child_id': 42},
'childs': [{'child_id': 42}]},
api._to_dict(fake_model))
@mock.patch.object(api, 'get_session')
@mock.patch('refstack.db.sqlalchemy.models.TestResults')
@mock.patch('refstack.db.sqlalchemy.models.Test')
@ -122,7 +164,7 @@ class DBBackendTestCase(base.BaseTestCase):
{'name': 'tempest.some.test'},
{'name': 'tempest.test', 'uid': '12345678'}
],
'metadata': {'answer': 42}
'meta': {'answer': 42}
}
_id = 12345
@ -152,15 +194,15 @@ class DBBackendTestCase(base.BaseTestCase):
@mock.patch.object(api, 'get_session')
@mock.patch('refstack.db.sqlalchemy.models.Test')
def test_get_test(self, mock_test, mock_get_session):
@mock.patch.object(api, '_to_dict', side_effect=lambda x, *args: x)
def test_get_test(self, mock_to_dict, mock_test, mock_get_session):
session = mock_get_session.return_value
session.query = mock.Mock()
query = session.query.return_value
query.filter_by = mock.Mock()
filter_by = query.filter_by.return_value
expected_result = 'fake_test_info'
filter_by.first = mock.Mock(return_value=expected_result)
mock_result = 'fake_test_info'
filter_by.first = mock.Mock(return_value=mock_result)
test_id = 'fake_id'
actual_result = api.get_test(test_id)
@ -168,7 +210,105 @@ class DBBackendTestCase(base.BaseTestCase):
session.query.assert_called_once_with(mock_test)
query.filter_by.assert_called_once_with(id=test_id)
filter_by.first.assert_called_once_with()
self.assertEqual(expected_result, actual_result)
self.assertEqual(mock_result, actual_result)
session = mock_get_session.return_value
session.query = mock.Mock()
query = session.query.return_value
query.filter_by.return_value.first.return_value = None
self.assertRaises(api.NotFound, api.get_test, 'fake_id')
@mock.patch('refstack.db.sqlalchemy.api.models')
@mock.patch.object(api, 'get_session')
def test_delete_test(self, mock_get_session, mock_models):
session = mock_get_session.return_value
test_query = mock.Mock()
test_meta_query = mock.Mock()
test_results_query = mock.Mock()
session.query = mock.Mock(side_effect={
mock_models.Test: test_query,
mock_models.TestMeta: test_meta_query,
mock_models.TestResults: test_results_query
}.get)
db.delete_test('fake_id')
session.begin.assert_called_once_with()
test_query.filter_by.return_value.first\
.assert_called_once_with()
test_meta_query.filter_by.return_value.delete\
.assert_called_once_with()
test_results_query.filter_by.return_value.delete\
.assert_called_once_with()
session.delete.assert_called_once_with(
test_query.filter_by.return_value.first.return_value)
mock_get_session.return_value = mock.MagicMock()
session = mock_get_session.return_value
session.query.return_value\
.filter_by.return_value\
.first.return_value = None
self.assertRaises(api.NotFound, db.delete_test, 'fake_id')
@mock.patch('refstack.db.sqlalchemy.api.models')
@mock.patch.object(api, 'get_session')
def test_get_test_meta_key(self, mock_get_session, mock_models):
session = mock_get_session.return_value
session.query.return_value\
.filter_by.return_value\
.filter_by.return_value\
.first.return_value = mock.Mock(value=42)
self.assertEqual(42, db.get_test_meta_key('fake_id', 'fake_key'))
session.query.return_value\
.filter_by.return_value\
.filter_by.return_value\
.first.return_value = None
self.assertEqual(24, db.get_test_meta_key('fake_id', 'fake_key', 24))
@mock.patch('refstack.db.sqlalchemy.api.models')
@mock.patch.object(api, 'get_session')
def test_save_test_meta_item(self, mock_get_session, mock_models):
session = mock_get_session.return_value
mock_meta_item = mock.Mock()
session.query.return_value\
.filter_by.return_value\
.filter_by.return_value\
.first.return_value = mock_meta_item
db.save_test_meta_item('fake_id', 'fake_key', 42)
self.assertEqual('fake_id', mock_meta_item.test_id)
self.assertEqual('fake_key', mock_meta_item.meta_key)
self.assertEqual(42, mock_meta_item.value)
session.begin.assert_called_once_with()
mock_meta_item.save.assert_called_once_with(session)
session.query.return_value\
.filter_by.return_value\
.filter_by.return_value\
.first.return_value = None
mock_meta_item = mock.Mock()
mock_models.TestMeta.return_value = mock_meta_item
db.save_test_meta_item('fake_id', 'fake_key', 42)
self.assertEqual('fake_id', mock_meta_item.test_id)
self.assertEqual('fake_key', mock_meta_item.meta_key)
self.assertEqual(42, mock_meta_item.value)
@mock.patch('refstack.db.sqlalchemy.api.models')
@mock.patch.object(api, 'get_session')
def test_delete_test_meta_item(self, mock_get_session, mock_models):
session = mock_get_session.return_value
mock_meta_item = mock.Mock()
session.query.return_value\
.filter_by.return_value\
.filter_by.return_value\
.first.return_value = mock_meta_item
db.delete_test_meta_item('fake_id', 'fake_key')
session.begin.assert_called_once_with()
session.delete.assert_called_once_with(mock_meta_item)
session.query.return_value\
.filter_by.return_value\
.filter_by.return_value\
.first.return_value = None
self.assertRaises(db.NotFound,
db.delete_test_meta_item, 'fake_id', 'fake_key')
@mock.patch.object(api, 'get_session')
@mock.patch('refstack.db.sqlalchemy.models.TestResults')
@ -180,14 +320,15 @@ class DBBackendTestCase(base.BaseTestCase):
query = session.query.return_value
query.filter_by = mock.Mock()
filter_by = query.filter_by.return_value
expected_result = 'fake_test_results'
filter_by.all = mock.Mock(return_value=expected_result)
mock_result = 'fake_test_results'
expected_result = ['fake_test_results']
filter_by.all = mock.Mock(return_value=[mock_result])
test_id = 'fake_id'
actual_result = api.get_test_results(test_id)
mock_get_session.assert_called_once_with()
session.query.assert_called_once_with(mock_test_result.name)
session.query.assert_called_once_with(mock_test_result)
query.filter_by.assert_called_once_with(test_id=test_id)
filter_by.all.assert_called_once_with()
self.assertEqual(expected_result, actual_result)
@ -206,6 +347,15 @@ class DBBackendTestCase(base.BaseTestCase):
api_const.CPID: 'fake3'
}
unsigned_query = (query
.filter.return_value
.filter.return_value
.filter.return_value)
unsigned_query.session.query.return_value.filter_by.side_effect = (
'signed_results_query', 'shared_results_query'
)
result = api._apply_filters_for_query(query, filters)
query.filter.assert_called_once_with(mock_test.created_at >=
@ -219,18 +369,56 @@ class DBBackendTestCase(base.BaseTestCase):
query.filter.assert_called_once_with(mock_test.cpid ==
filters[api_const.CPID])
query = query.filter.return_value
unsigned_query.session.query.assert_has_calls((
mock.call(mock_meta.test_id),
mock.call().filter_by(meta_key='public_key'),
mock.call(mock_meta.test_id),
mock.call().filter_by(meta_key='shared'),
))
unsigned_query.filter.assert_has_calls((
mock.call(mock_test.id.notin_.return_value),
mock.call(mock_test.id.in_.return_value),
mock.call().union(unsigned_query.filter.return_value)
))
filtered_query = unsigned_query.filter.return_value.union.return_value
query.session.query.assert_called_once_with(mock_meta.test_id)
meta_query = query.session.query.return_value
self.assertEqual(result, filtered_query)
meta_query.filter_by.\
assert_called_once_with(meta_key=api_const.PUBLIC_KEY)
unsigned_test_id_query = meta_query.filter_by.return_value
mock_test.id.notin_.assert_called_once_with(unsigned_test_id_query)
query.filter.assert_called_once_with(mock_test.id.notin_.return_value)
@mock.patch('refstack.db.sqlalchemy.models.Test')
@mock.patch('refstack.db.sqlalchemy.models.TestMeta')
def test_apply_filters_for_query_signed(self, mock_meta,
mock_test):
query = mock.Mock()
mock_test.created_at = six.text_type()
mock_meta.test_id = six.text_type()
filtered_query = query.filter.return_value
filters = {
api_const.START_DATE: 'fake1',
api_const.END_DATE: 'fake2',
api_const.CPID: 'fake3',
api_const.USER_PUBKEYS: ['fake_pk'],
api_const.SIGNED: 'true'
}
signed_query = (query
.filter.return_value
.filter.return_value
.filter.return_value)
result = api._apply_filters_for_query(query, filters)
signed_query.join.assert_called_once_with(mock_test.meta)
signed_query = signed_query.join.return_value
signed_query.filter.assert_called_once_with(
mock_meta.meta_key == api_const.PUBLIC_KEY
)
signed_query = signed_query.filter.return_value
mock_meta.value.in_.assert_called_once_with(
filters[api_const.USER_PUBKEYS])
signed_query.filter.assert_called_once_with(
mock_meta.value.in_.return_value)
filtered_query = signed_query.filter.return_value
self.assertEqual(result, filtered_query)
@mock.patch.object(api, '_apply_filters_for_query')
@ -252,14 +440,12 @@ class DBBackendTestCase(base.BaseTestCase):
second_query = mock_apply.return_value
ordered_query = second_query.order_by.return_value
query_with_offset = ordered_query.offset.return_value
query_with_offset.limit.return_value = 'fake_uploads'
query_with_offset.limit.return_value.all.return_value = 'fake_uploads'
result = api.get_test_records(2, per_page, filters)
mock_get_session.assert_called_once_with()
session.query.assert_called_once_with(mock_model.id,
mock_model.created_at,
mock_model.cpid)
session.query.assert_called_once_with(mock_model)
mock_apply.assert_called_once_with(first_query, filters)
second_query.order_by.\
assert_called_once_with(mock_model.created_at.desc())
@ -314,11 +500,11 @@ class DBBackendTestCase(base.BaseTestCase):
query = session.query.return_value
filtered = query.filter_by.return_value
filtered.first.return_value = None
self.assertRaises(api.UserNotFound, api.user_get, user_openid)
self.assertRaises(api.NotFound, api.user_get, user_openid)
@mock.patch.object(api, 'get_session')
@mock.patch('refstack.db.sqlalchemy.models.User')
@mock.patch.object(api, 'user_get', side_effect=api.UserNotFound)
@mock.patch.object(api, 'user_get', side_effect=api.NotFound('User'))
def test_user_update_or_create(self, mock_get_user, mock_model,
mock_get_session):
user_info = {'openid': 'user@example.com'}
@ -332,3 +518,69 @@ class DBBackendTestCase(base.BaseTestCase):
user.save.assert_called_once_with(session=session)
user.update.assert_called_once_with(user_info)
session.begin.assert_called_once_with()
@mock.patch.object(api, 'get_session')
@mock.patch('refstack.db.sqlalchemy.api.models')
def test_store_pubkey(self, mock_models, mock_get_session):
session = mock_get_session.return_value
pubkey_info = {
'openid': 'fake_id',
'format': 'ssh-rsa',
'pubkey': 'cHV0aW4gaHVpbG8=',
'comment': 'comment'
}
mock_pubkey = mock.Mock()
mock_pubkey.id = 42
mock_models.PubKey.return_value = mock_pubkey
session.query.return_value\
.filter_by.return_value\
.filter_by.return_value\
.all.return_value = None
self.assertEqual(42, db.store_pubkey(pubkey_info))
self.assertEqual('fake_id', mock_pubkey.openid)
self.assertEqual('ssh-rsa', mock_pubkey.format)
self.assertEqual('cHV0aW4gaHVpbG8=', mock_pubkey.pubkey)
self.assertEqual(
hashlib.md5(
base64.b64decode('cHV0aW4gaHVpbG8='.encode('ascii'))
).hexdigest(),
'3b30cd2bdac1eeb7e92dfc983bf5f943'
)
mock_pubkey.save.assert_called_once_with(session)
session.query.return_value\
.filter_by.return_value\
.filter_by.return_value\
.all.return_value = mock_pubkey
self.assertRaises(oslo_db_exc.DBDuplicateEntry,
db.store_pubkey, pubkey_info)
@mock.patch.object(api, 'get_session')
@mock.patch('refstack.db.sqlalchemy.api.models')
def test_delete_pubkey(self, mock_models, mock_get_session):
session = mock_get_session.return_value
db.delete_pubkey('key_id')
key = session\
.query.return_value\
.filter_by.return_value\
.first.return_value
session.query.assert_called_once_with(mock_models.PubKey)
session.query.return_value.filter_by.assert_called_once_with(
id='key_id')
session.delete.assert_called_once_with(key)
session.begin.assert_called_once_with()
@mock.patch.object(api, 'get_session')
@mock.patch('refstack.db.sqlalchemy.api.models')
@mock.patch.object(api, '_to_dict', side_effect=lambda x: x)
def test_get_user_pubkeys(self, mock_to_dict, mock_models,
mock_get_session):
session = mock_get_session.return_value
actual_keys = db.get_user_pubkeys('user_id')
keys = session \
.query.return_value \
.filter_by.return_value \
.all.return_value
session.query.assert_called_once_with(mock_models.PubKey)
session.query.return_value.filter_by.assert_called_once_with(
openid='user_id')
self.assertEqual(keys, actual_keys)

View File

@ -64,7 +64,7 @@ class ValidatorsTestCase(base.BaseTestCase):
class TestResultValidatorTestCase(base.BaseTestCase):
"""Test case for TestResultValidator."""
FAKE_TESTS_RESULTS_JSON = {
FAKE_JSON = {
'cpid': 'foo',
'duration_seconds': 10,
'results': [
@ -87,18 +87,17 @@ class TestResultValidatorTestCase(base.BaseTestCase):
def test_validation(self):
with mock.patch('jsonschema.validate') as mock_validate:
request = mock.Mock()
request.body = json.dumps(self.FAKE_TESTS_RESULTS_JSON)
request.body = json.dumps(self.FAKE_JSON)
request.headers = {}
self.validator.validate(request)
mock_validate.assert_called_once_with(self.FAKE_TESTS_RESULTS_JSON,
mock_validate.assert_called_once_with(self.FAKE_JSON,
self.validator.schema)
@mock.patch('jsonschema.validate')
def test_validation_with_signature(self, mock_validate):
def test_validation_with_signature(self):
if six.PY3:
self.skip('https://github.com/dlitz/pycrypto/issues/99')
request = mock.Mock()
request.body = json.dumps(self.FAKE_TESTS_RESULTS_JSON)
request.body = json.dumps(self.FAKE_JSON)
data_hash = SHA256.new()
data_hash.update(request.body.encode('utf-8'))
key = RSA.generate(4096)
@ -109,8 +108,6 @@ class TestResultValidatorTestCase(base.BaseTestCase):
'X-Public-Key': key.publickey().exportKey('OpenSSH')
}
self.validator.validate(request)
mock_validate.assert_called_once_with(self.FAKE_TESTS_RESULTS_JSON,
self.validator.schema)
def test_validation_fail_no_json(self):
wrong_request = mock.Mock()
@ -142,7 +139,7 @@ class TestResultValidatorTestCase(base.BaseTestCase):
self.skip('https://github.com/dlitz/pycrypto/issues/99')
request = mock.Mock()
request.body = json.dumps(self.FAKE_TESTS_RESULTS_JSON)
request.body = json.dumps(self.FAKE_JSON)
key = RSA.generate(2048)
request.headers = {
'X-Signature': binascii.b2a_hex('fake_sign'.encode('utf-8')),
@ -178,3 +175,106 @@ class TestResultValidatorTestCase(base.BaseTestCase):
self.validator.validate(request)
except validators.ValidationError as e:
self.assertIsInstance(e.exc, ValueError)
class PubkeyValidatorTestCase(base.BaseTestCase):
"""Test case for TestResultValidator."""
FAKE_JSON = {
'raw_key': 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQC4GAwIjFN6mkN09Vfc8h'
'VCnbztex/kjVdPlGraBLR+M9VoehOMJgLawpn2f+rM7NjDDgIwvj0kHVMZ'
'cBk5MZ1eQg3ACtP2EBw0SLLZ9uMSuHoDTf8oHVgNlNrHL3sc/QYJYfSqRh'
'FS2JvIVNnC2iG8jwnxUBI9rBspYU8AkrrczQ== Don\'t_Panic.',
'self_signature': '9d6c4c74b4ec47bb4db8f288a502d2d2f686e7228d387377b8'
'c89ee67345ad04f8e518e0a627afe07217defbbd8acdd6dd88'
'74104e631731a1fb4dab1a34e06a0680f11337d1fae0b7a9ad'
'5942e0aacd2245c4cf7a78a96c4800eb4f6d8c363822aaaf43'
'aa3a648ddee84f3ea0b91e2e977ca19df72ad80226c12b1221'
'c2fb61'
}
def setUp(self):
super(PubkeyValidatorTestCase, self).setUp()
self.validator = validators.PubkeyValidator()
def test_validation(self):
if six.PY3:
self.skip('https://github.com/dlitz/pycrypto/issues/99')
request = mock.Mock()
request.body = json.dumps(self.FAKE_JSON)
self.validator.validate(request)
def test_validation_fail_no_json(self):
wrong_request = mock.Mock()
wrong_request.body = 'foo'
self.assertRaises(validators.ValidationError,
self.validator.validate,
wrong_request)
try:
self.validator.validate(wrong_request)
except validators.ValidationError as e:
self.assertIsInstance(e.exc, ValueError)
def test_validation_fail(self):
wrong_request = mock.Mock()
wrong_request.body = json.dumps({
'foo': 'bar'
})
self.assertRaises(validators.ValidationError,
self.validator.validate,
wrong_request)
try:
self.validator.validate(wrong_request)
except validators.ValidationError as e:
self.assertIsInstance(e.exc, jsonschema.ValidationError)
@mock.patch('jsonschema.validate')
def test_validation_with_broken_signature(self, mock_validate):
if six.PY3:
self.skip('https://github.com/dlitz/pycrypto/issues/99')
body = self.FAKE_JSON.copy()
body['self_signature'] = 'deadbeef'
request = mock.Mock()
request.body = json.dumps(body)
try:
self.validator.validate(request)
except validators.ValidationError as e:
self.assertEqual(e.title,
'Signature verification failed')
body = {
'raw_key': 'fake key comment',
'self_signature': 'deadbeef'
}
request = mock.Mock()
request.body = json.dumps(body)
try:
self.validator.validate(request)
except validators.ValidationError as e:
self.assertEqual(e.title,
'Public key has unsupported format')
body = {
'raw_key': 'ssh-rsa key comment',
'self_signature': 'deadbeef?'
}
request = mock.Mock()
request.body = json.dumps(body)
try:
self.validator.validate(request)
except validators.ValidationError as e:
self.assertEqual(e.title,
'Malformed signature')
body = {
'raw_key': 'ssh-rsa key comment',
'self_signature': 'deadbeef'
}
request = mock.Mock()
request.body = json.dumps(body)
try:
self.validator.validate(request)
except validators.ValidationError as e:
self.assertEqual(e.title,
'Malformed public key')