Remove remaining ignores for flake8

- cleanup code throughout to pass tests

Change-Id: I3a2a0f79940924dc954051b8fea0497a43579e98
This commit is contained in:
Bryan D. Payne 2015-02-20 09:11:17 -08:00
parent 073cec09db
commit f972213a42
16 changed files with 91 additions and 125 deletions

View File

@ -12,7 +12,6 @@
# under the License.
from cryptography.hazmat.backends.openssl import backend
import errors
import message_digest
import name

View File

@ -12,7 +12,6 @@
# under the License.
from cryptography.hazmat.backends.openssl import backend
import errors

View File

@ -12,11 +12,10 @@
# under the License.
import certificate
from cryptography.hazmat.backends.openssl import backend
import errors
import name
from cryptography.hazmat.backends.openssl import backend
class X509CsrError(errors.X509Error):
def __init__(self, what):

View File

@ -13,8 +13,7 @@
import paste
from paste import translogger # noqa
from pecan import make_app
import pecan
import validators
@ -76,7 +75,7 @@ def setup_app(config):
validate_config(config)
app = make_app(
app = pecan.make_app(
app_conf.pop('root'),
logging=getattr(config, 'logging', {}),
**app_conf

View File

@ -11,8 +11,8 @@
# License for the specific language governing permissions and limitations
# under the License.
from pecan import abort
from pecan import conf
import pecan
# One time, on import, we want to safely build a list of the auth
# modules listed in the config that we should be using for validate.
@ -21,7 +21,7 @@ from pecan import conf
# imcomplete or malformed.
AUTH_MODULES = []
try:
for auth_type in conf.to_dict().get('auth', {}).keys():
for auth_type in pecan.conf.to_dict().get('auth', {}).keys():
try:
module_name = "{}.{}".format(__name__, auth_type)
module = __import__(module_name, fromlist=[''])
@ -49,4 +49,4 @@ def validate(user, secret):
return res
# we should only get here if a module failed to abort
abort(401, "authentication failure")
pecan.abort(401, "authentication failure")

View File

@ -14,11 +14,12 @@
import json
import logging
from .results import AuthDetails
from pecan import conf
import pecan
import requests
from anchor.auth import results
logger = logging.getLogger(__name__)
@ -34,7 +35,7 @@ def login(_, token):
"token": {
"id": token
}}}})
req = requests.post(conf.auth['keystone']['url'] + '/v3/auth/tokens',
req = requests.post(pecan.conf.auth['keystone']['url'] + '/v3/auth/tokens',
headers={'Content-Type': 'application/json'},
data=data)
if req.status_code != 200:
@ -51,4 +52,4 @@ def login(_, token):
logger.exception("Keystone response was not in the expected format")
return None
return AuthDetails(username=user, groups=roles)
return results.AuthDetails(username=user, groups=roles)

View File

@ -11,11 +11,11 @@
# License for the specific language governing permissions and limitations
# under the License.
from .results import AuthDetails
import ldap
import ldap.filter
from pecan import conf
import pecan
from anchor.auth import results
def user_get_groups(attributes):
@ -36,18 +36,20 @@ def login(user, secret):
:param secret: Secret/Passphrase
:returns: AuthDetails -- Class used for authentication information
"""
ldo = ldap.initialize("ldap://%s" % (conf.auth['ldap']['host'],))
ldo = ldap.initialize("ldap://%s" % (pecan.conf.auth['ldap']['host'],))
ldo.set_option(ldap.OPT_REFERRALS, 0)
try:
ldo.simple_bind_s("%s@%s" % (user, conf.auth['ldap']['domain']),
ldo.simple_bind_s("%s@%s" % (user, pecan.conf.auth['ldap']['domain']),
secret)
filter_str = ('(sAMAccountName=%s)' %
ldap.filter.escape_filter_chars(user))
ret = ldo.search_s(conf.auth['ldap']['base'], ldap.SCOPE_SUBTREE,
filterstr=filter_str, attrlist=['memberOf'])
ret = ldo.search_s(pecan.conf.auth['ldap']['base'],
ldap.SCOPE_SUBTREE,
filterstr=filter_str,
attrlist=['memberOf'])
user_attrs = [x for x in ret if x[0] is not None][0][1]
user_groups = user_get_groups(user_attrs)
return AuthDetails(username=user, groups=user_groups)
return results.AuthDetails(username=user, groups=user_groups)
except ldap.INVALID_CREDENTIALS:
return None

View File

@ -11,6 +11,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from collections import namedtuple
import collections
AuthDetails = namedtuple('AuthDetails', ['username', 'groups'])
AuthDetails = collections.namedtuple('AuthDetails', ['username', 'groups'])

View File

@ -13,9 +13,9 @@
import logging
from pecan import conf
import pecan
from anchor.auth.results import AuthDetails
from anchor.auth import results
from anchor import util
@ -44,8 +44,8 @@ def login(user, secret):
# expected values
try:
e_user = str(conf.auth['static']['user'])
e_pass = str(conf.auth['static']['secret'])
e_user = str(pecan.conf.auth['static']['user'])
e_pass = str(pecan.conf.auth['static']['secret'])
except (KeyError, TypeError):
logger.warn("auth conf missing static user or secret")
return None
@ -69,6 +69,6 @@ def login(user, secret):
# do not see an obvious solution to this problem, but also believe
# that leaking which input was valid isn't as big of a concern.
if valid_user and valid_pass:
return AuthDetails(username=e_user, groups=[])
return results.AuthDetails(username=e_user, groups=[])
logger.info("failed static auth for user {}".format(user))

View File

@ -11,20 +11,18 @@
# License for the specific language governing permissions and limitations
# under the License.
from X509 import certificate
from X509 import signing_request
from X509 import utils as X509_utils
import logging
import os
import sys
import time
import uuid
from pecan import abort
from pecan import conf
import pecan
from . import validators
from anchor import validators
from anchor.X509 import certificate
from anchor.X509 import signing_request
from anchor.X509 import utils as X509_utils
logger = logging.getLogger(__name__)
@ -45,11 +43,11 @@ def parse_csr(csr, encoding):
# validate untrusted input
if str(encoding).lower() not in VALID_ENCODINGS:
logger.error("parse_csr failed: bad encoding ({})".format(encoding))
abort(400, "invalid CSR")
pecan.abort(400, "invalid CSR")
if csr is None:
logger.error("parse_csr failed: missing CSR")
abort(400, "invalid CSR")
pecan.abort(400, "invalid CSR")
# load the CSR into the backend X509 library
try:
@ -57,21 +55,21 @@ def parse_csr(csr, encoding):
out_req.from_buffer(csr)
return out_req
except Exception as e:
logger.exception("parse_csr exception while parsing the CSR: %s", e)
abort(400, "invalid CSR")
logger.exception("Exception while parsing the CSR: %s", e)
pecan.abort(400, "CSR cannot be parsed")
def validate_csr(auth_result, csr, request):
args = {'auth_result': auth_result,
'csr': csr,
'conf': conf,
'conf': pecan.conf,
'request': request}
# It is ok if the config doesn't have any validators listed
# so we set the initial state to valid.
valid = True
for validator_set in conf.validators:
for validator_set in pecan.conf.validators:
logger.debug("Checking validators set <%s>",
validator_set.get("name"))
@ -114,32 +112,32 @@ def validate_csr(auth_result, csr, request):
return
# something failed, return a 400 to the client
abort(400, "CSR failed validation")
pecan.abort(400, "CSR failed validation")
def sign(csr):
try:
ca = certificate.X509Certificate()
ca.from_file(conf.ca["cert_path"])
ca.from_file(pecan.conf.ca["cert_path"])
except Exception as e:
logger.exception("Cannot load the signing CA: %s", e)
abort(500, "certificate signing error")
pecan.abort(500, "certificate signing error")
try:
key_data = None
with open(conf.ca["key_path"]) as f:
with open(pecan.conf.ca["key_path"]) as f:
key_data = f.read()
key = X509_utils.load_pem_private_key(key_data)
except Exception as e:
logger.exception("Cannot load the signing CA key: %s", e)
abort(500, "certificate signing error")
pecan.abort(500, "certificate signing error")
new_cert = certificate.X509Certificate()
new_cert.set_version(2)
start_time = int(time.time())
end_time = start_time + (conf.ca['valid_hours'] * 60 * 60)
end_time = start_time + (pecan.conf.ca['valid_hours'] * 60 * 60)
new_cert.set_not_before(start_time)
new_cert.set_not_after(end_time)
@ -159,11 +157,11 @@ def sign(csr):
logger.info("Signing certificate for <%s> with serial <%s>",
csr.get_subject(), serial)
new_cert.sign(key, conf.ca['signing_hash'])
new_cert.sign(key, pecan.conf.ca['signing_hash'])
path = os.path.join(
conf.ca['output_path'],
'%s.crt' % new_cert.get_fingerprint(conf.ca['signing_hash']))
pecan.conf.ca['output_path'],
'%s.crt' % new_cert.get_fingerprint(pecan.conf.ca['signing_hash']))
logger.info("Saving certificate to: %s", path)
new_cert.save(path)
@ -174,4 +172,4 @@ def sign(csr):
if cert:
return cert
abort(500, "certificate signing error")
pecan.abort(500, "certificate signing error")

View File

@ -11,38 +11,38 @@
# License for the specific language governing permissions and limitations
# under the License.
from pecan import expose
from pecan import request
from pecan.rest import RestController
from .. import auth
from .. import certificate_ops
import logging
import pecan
from pecan import rest
from anchor import auth
from anchor import certificate_ops
logger = logging.getLogger(__name__)
class RobotsController(RestController):
class RobotsController(rest.RestController):
"""Serves /robots.txt that disallows search bots."""
@expose(content_type="text/plain")
@pecan.expose(content_type="text/plain")
def get(self):
return "User-agent: *\nDisallow: /\n"
class SignController(RestController):
class SignController(rest.RestController):
"""Handles POST requests to /sign."""
@expose(content_type="text/plain")
@pecan.expose(content_type="text/plain")
def post(self):
auth_result = auth.validate(request.POST.get('user'),
request.POST.get('secret'))
auth_result = auth.validate(pecan.request.POST.get('user'),
pecan.request.POST.get('secret'))
csr = certificate_ops.parse_csr(request.POST.get('csr'),
request.POST.get('encoding'))
csr = certificate_ops.parse_csr(pecan.request.POST.get('csr'),
pecan.request.POST.get('encoding'))
certificate_ops.validate_csr(auth_result, csr, request)
certificate_ops.validate_csr(auth_result, csr, pecan.request)
return certificate_ops.sign(csr)

View File

@ -1,15 +0,0 @@
#!/usr/bin/env python
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

@ -17,8 +17,7 @@
import unittest
import mock
from webob.exc import HTTPUnauthorized
from webob import exc as http_status
class AuthStaticTests(unittest.TestCase):
@ -38,18 +37,18 @@ class AuthStaticTests(unittest.TestCase):
with mock.patch.dict(config, data):
# can't import until mock'd
from anchor import auth
from anchor.auth.results import AuthDetails
from anchor.auth import results
valid_user = data['auth']['static']['user']
valid_pass = data['auth']['static']['secret']
expected = AuthDetails(username=valid_user, groups=[])
expected = results.AuthDetails(username=valid_user, groups=[])
self.assertEqual(auth.validate(valid_user, valid_pass), expected)
with self.assertRaises(HTTPUnauthorized):
with self.assertRaises(http_status.HTTPUnauthorized):
auth.validate(valid_user, 'badpass')
with self.assertRaises(HTTPUnauthorized):
with self.assertRaises(http_status.HTTPUnauthorized):
auth.validate('baduser', valid_pass)
with self.assertRaises(HTTPUnauthorized):
with self.assertRaises(http_status.HTTPUnauthorized):
auth.validate('baduser', 'badpass')
def test_validate_static_malformed1(self):
@ -60,7 +59,7 @@ class AuthStaticTests(unittest.TestCase):
with mock.patch.dict(config, data):
# can't import until mock'd
from anchor import auth
with self.assertRaises(HTTPUnauthorized):
with self.assertRaises(http_status.HTTPUnauthorized):
auth.validate('baduser', 'badpass')
def test_validate_static_malformed2(self):
@ -71,5 +70,5 @@ class AuthStaticTests(unittest.TestCase):
with mock.patch.dict(config, data):
# can't import until mock'd
from anchor import auth
with self.assertRaises(HTTPUnauthorized):
with self.assertRaises(http_status.HTTPUnauthorized):
auth.validate('baduser', 'badpass')

View File

@ -14,15 +14,13 @@
# License for the specific language governing permissions and limitations
# under the License.
import unittest
from anchor.app import ConfigValidationException
from anchor.app import validate_config
import bad_config_domains
import good_config_domains
from anchor import app
class TestValidDN(unittest.TestCase):
@ -36,11 +34,11 @@ class TestValidDN(unittest.TestCase):
self.assertTrue(True)
def test_config_check_domains_good(self):
self.assertEqual(validate_config(good_config_domains), None)
self.assertEqual(app.validate_config(good_config_domains), None)
def test_config_check_domains_bad(self):
self.assertRaises(
ConfigValidationException,
validate_config,
app.ConfigValidationException,
app.validate_config,
bad_config_domains
)

View File

@ -14,13 +14,11 @@
# License for the specific language governing permissions and limitations
# under the License.
import textwrap
import unittest
import textwrap
import mock
from webob.exc import HTTPClientError
from webob import exc as http_status
from anchor import certificate_ops
@ -79,23 +77,19 @@ class CertificateOpsTests(unittest.TestCase):
self.assertEqual(actual_cn, self.expected_cn)
def test_parse_csr_fail1(self):
"""Test invalid CSR format (wrong value) for parse_csr."""
with self.assertRaises(HTTPClientError):
with self.assertRaises(http_status.HTTPClientError):
certificate_ops.parse_csr(self.csr, 'blah')
def test_parse_csr_fail2(self):
"""Test invalid CSR format (wrong type) for parse_csr."""
with self.assertRaises(HTTPClientError):
with self.assertRaises(http_status.HTTPClientError):
certificate_ops.parse_csr(self.csr, True)
def test_parse_csr_fail3(self):
"""Test invalid CSR (None) format for parse_csr."""
with self.assertRaises(HTTPClientError):
with self.assertRaises(http_status.HTTPClientError):
certificate_ops.parse_csr(None, 'pem')
def test_parse_csr_fail4(self):
"""Test invalid CSR (wrong value) format for parse_csr."""
with self.assertRaises(HTTPClientError):
with self.assertRaises(http_status.HTTPClientError):
certificate_ops.parse_csr('invalid csr input', 'pem')
def test_validate_csr_success(self):
@ -126,7 +120,7 @@ class CertificateOpsTests(unittest.TestCase):
data = {'validators': validators}
with mock.patch.dict(config, data):
with self.assertRaises(HTTPClientError):
with self.assertRaises(http_status.HTTPClientError):
certificate_ops.validate_csr(None, None, None)
def test_validate_csr_fail3(self):
@ -136,7 +130,7 @@ class CertificateOpsTests(unittest.TestCase):
data = {'validators': validators}
with mock.patch.dict(config, data):
with self.assertRaises(HTTPClientError):
with self.assertRaises(http_status.HTTPClientError):
certificate_ops.validate_csr(None, None, None)
def test_validate_csr_fail4(self):
@ -146,7 +140,7 @@ class CertificateOpsTests(unittest.TestCase):
data = {'validators': validators}
with mock.patch.dict(config, data):
with self.assertRaises(HTTPClientError):
with self.assertRaises(http_status.HTTPClientError):
certificate_ops.validate_csr(None, None, None)
def test_validate_csr_fail5(self):
@ -157,5 +151,5 @@ class CertificateOpsTests(unittest.TestCase):
data = {'validators': validators}
with mock.patch.dict(config, data):
with self.assertRaises(HTTPClientError):
with self.assertRaises(http_status.HTTPClientError):
certificate_ops.validate_csr(None, csr_obj, None)

View File

@ -30,15 +30,7 @@ commands =
coverage xml
[flake8]
# E123, E125 skipped as they are invalid PEP-8.
# H303 no wild card imports
# F403 unable to detect undefined names
# H104 file contains nothing but comments
# H302 import only modules
# H307 like imports should be grouped together
# H304 no relative imports
show-source = True
ignore = E123,E125,H303,F403,H104,H302,H307,H304
builtins = _
exclude=.venv,.git,.tox,dist,doc,*openstack/common*,*lib/python*,*egg,build
max-complexity=25