
The repo is Python 3 now, so update hacking to version 3.0 which supports Python 3. Fix problems found by updated hacking version. Update local hacking checks for new flake8, remove test B314 since that tests difference between Python 2 and 3, there's no need to advise using six anymore. Use oslotest.base directly, this fixes the hacking tests. Remove ddt usage in testsuite, it does not work with current hacking version anymore. Change-Id: Iee4584c6fde08728c017468d9de1db73f2c79d8d
226 lines
7.5 KiB
Python
226 lines
7.5 KiB
Python
# Copyright (c) 2013-2014 Rackspace, Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""
|
|
Common utilities for Barbican.
|
|
"""
|
|
import collections
|
|
import importlib
|
|
import mimetypes
|
|
import uuid
|
|
|
|
from oslo_log import log
|
|
from oslo_utils import uuidutils
|
|
import pecan
|
|
import re
|
|
import six
|
|
from six.moves.urllib import parse
|
|
|
|
from barbican.common import config
|
|
from barbican import i18n as u
|
|
|
|
|
|
CONF = config.CONF
|
|
|
|
|
|
# Current API version
|
|
API_VERSION = 'v1'
|
|
|
|
# Added here to remove cyclic dependency.
|
|
# In barbican.model.models module SecretType.OPAQUE was imported from
|
|
# barbican.plugin.interface.secret_store which introduces a cyclic dependency
|
|
# if `secret_store` plugin needs to use db model classes. So moving shared
|
|
# value to another common python module which is already imported in both.
|
|
SECRET_TYPE_OPAQUE = "opaque" # nosec
|
|
|
|
|
|
def _do_allow_certain_content_types(func, content_types_list=[]):
|
|
# Allows you to bypass pecan's content-type restrictions
|
|
cfg = pecan.util._cfg(func)
|
|
cfg.setdefault('content_types', {})
|
|
cfg['content_types'].update((value, '')
|
|
for value in content_types_list)
|
|
return func
|
|
|
|
|
|
def allow_certain_content_types(*content_types_list):
|
|
def _wrapper(func):
|
|
return _do_allow_certain_content_types(func, content_types_list)
|
|
return _wrapper
|
|
|
|
|
|
def allow_all_content_types(f):
|
|
return _do_allow_certain_content_types(f, mimetypes.types_map.values())
|
|
|
|
|
|
def get_base_url_from_request():
|
|
"""Derive base url from wsgi request if CONF.host_href is not set
|
|
|
|
Use host.href as base URL if its set in barbican.conf.
|
|
If its not set, then derives value from wsgi request. WSGI request uses
|
|
HOST header or HTTP_X_FORWARDED_FOR header (in case of proxy) for host +
|
|
port part of its url. Proxies can also set HTTP_X_FORWARDED_PROTO header
|
|
for indicating http vs https.
|
|
|
|
Some of unit tests does not have pecan context that's why using request
|
|
attr check on pecan instance.
|
|
"""
|
|
if not CONF.host_href and hasattr(pecan.request, 'application_url'):
|
|
p_url = parse.urlsplit(pecan.request.application_url)
|
|
# Pecan does not handle X_FORWARDED_PROTO yet, so we need to
|
|
# handle it ourselves. see lp#1445290
|
|
scheme = pecan.request.environ.get('HTTP_X_FORWARDED_PROTO', 'http')
|
|
# Pecan does not handle url reconstruction according to
|
|
# https://www.python.org/dev/peps/pep-0333/#url-reconstruction
|
|
netloc = pecan.request.environ.get('HTTP_HOST', p_url.netloc)
|
|
# FIXME: implement SERVER_NAME lookup if HTTP_HOST is not set
|
|
if p_url.path:
|
|
# Remove the version from the path to extract the base path
|
|
base_path = re.sub(r'/v[0-9\.]+$', '', p_url.path)
|
|
base_url = '%s://%s%s' % (scheme, netloc, base_path)
|
|
else:
|
|
base_url = '%s://%s' % (scheme, netloc)
|
|
return base_url
|
|
else: # when host_href is set or flow is not within wsgi request context
|
|
return CONF.host_href
|
|
|
|
|
|
def hostname_for_refs(resource=None):
|
|
"""Return the HATEOAS-style return URI reference for this service."""
|
|
base_url = get_base_url_from_request()
|
|
ref = ['{base}/{version}'.format(base=base_url, version=API_VERSION)]
|
|
if resource:
|
|
ref.append('/' + resource)
|
|
return ''.join(ref)
|
|
|
|
|
|
# Return a logger instance.
|
|
# Note: Centralize access to the logger to avoid the dreaded
|
|
# 'ArgsAlreadyParsedError: arguments already parsed: cannot
|
|
# register CLI option'
|
|
# error.
|
|
def getLogger(name):
|
|
return log.getLogger(name)
|
|
|
|
|
|
def get_accepted_encodings(req):
|
|
"""Returns a list of client acceptable encodings sorted by q value.
|
|
|
|
For details see: http://tools.ietf.org/html/rfc2616#section-14.3
|
|
|
|
:param req: request object
|
|
:returns: list of client acceptable encodings sorted by q value.
|
|
"""
|
|
header = req.get_header('Accept-Encoding')
|
|
|
|
return get_accepted_encodings_direct(header)
|
|
|
|
|
|
def get_accepted_encodings_direct(content_encoding_header):
|
|
"""Returns a list of client acceptable encodings sorted by q value.
|
|
|
|
For details see: http://tools.ietf.org/html/rfc2616#section-14.3
|
|
|
|
:param req: request object
|
|
:returns: list of client acceptable encodings sorted by q value.
|
|
"""
|
|
if content_encoding_header is None:
|
|
return None
|
|
|
|
Encoding = collections.namedtuple('Encoding', ['coding', 'quality'])
|
|
|
|
encodings = list()
|
|
for enc in content_encoding_header.split(','):
|
|
if ';' in enc:
|
|
coding, qvalue = enc.split(';')
|
|
try:
|
|
qvalue = qvalue.split('=')[1]
|
|
quality = float(qvalue.strip())
|
|
except ValueError:
|
|
# can't convert quality to float
|
|
return None
|
|
if quality > 1.0 or quality < 0.0:
|
|
# quality is outside valid range
|
|
return None
|
|
if quality > 0.0:
|
|
encodings.append(Encoding(coding.strip(), quality))
|
|
else:
|
|
encodings.append(Encoding(enc.strip(), 1))
|
|
|
|
# Sort the encodings by quality
|
|
encodings = sorted(encodings, key=lambda e: e.quality, reverse=True)
|
|
|
|
return [encoding.coding for encoding in encodings]
|
|
|
|
|
|
def generate_fullname_for(instance):
|
|
"""Produce a fully qualified class name for the specified instance.
|
|
|
|
:param instance: The instance to generate information from.
|
|
:return: A string providing the package.module information for the
|
|
instance.
|
|
:raises: ValueError if the given instance is null
|
|
"""
|
|
if not instance:
|
|
raise ValueError(u._("Cannot generate a fullname for a null instance"))
|
|
|
|
module = type(instance).__module__
|
|
class_name = type(instance).__name__
|
|
|
|
if module is None or module == six.moves.builtins.__name__:
|
|
return class_name
|
|
return "{module}.{class_name}".format(module=module, class_name=class_name)
|
|
|
|
|
|
def get_class_for(module_name, class_name):
|
|
"""Create a Python class from its text-specified components."""
|
|
# Load the module via name, raising ImportError if module cannot be
|
|
# loaded.
|
|
python_module = importlib.import_module(module_name)
|
|
|
|
# Load and return the resolved Python class, raising AttributeError if
|
|
# class cannot be found.
|
|
return getattr(python_module, class_name)
|
|
|
|
|
|
def generate_uuid():
|
|
return uuidutils.generate_uuid()
|
|
|
|
|
|
def is_multiple_backends_enabled():
|
|
try:
|
|
secretstore_conf = config.get_module_config('secretstore')
|
|
except KeyError:
|
|
# Ensure module is initialized
|
|
from barbican.plugin.interface import secret_store # noqa: F401
|
|
secretstore_conf = config.get_module_config('secretstore')
|
|
return secretstore_conf.secretstore.enable_multiple_secret_stores
|
|
|
|
|
|
def validate_id_is_uuid(input_id, version=4):
|
|
"""Validates provided id is uuid4 format value.
|
|
|
|
Returns true when provided id is a valid version 4 uuid otherwise
|
|
returns False.
|
|
This validation is to be used only for ids which are generated by barbican
|
|
(e.g. not for keystone project_id)
|
|
"""
|
|
|
|
try:
|
|
value = uuid.UUID(input_id, version=version)
|
|
except Exception:
|
|
return False
|
|
return str(value) == input_id
|