
Taken from https://github.com/dshakhray/python-glareclient Change-Id: If0e7e5cd0e39281f725df21308a18ed6caa8a009
227 lines
8.9 KiB
Python
227 lines
8.9 KiB
Python
# Copyright 2012 OpenStack Foundation
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import os
|
|
|
|
import mock
|
|
import six
|
|
import ssl
|
|
import testtools
|
|
import threading
|
|
|
|
from glareclient import Client
|
|
from glareclient import exc
|
|
from glareclient import v1
|
|
|
|
if six.PY3 is True:
|
|
import socketserver
|
|
else:
|
|
import SocketServer as socketserver
|
|
|
|
|
|
TEST_VAR_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__),
|
|
'var'))
|
|
|
|
|
|
class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
|
|
def handle(self):
|
|
self.request.recv(1024)
|
|
response = b'somebytes'
|
|
self.request.sendall(response)
|
|
|
|
|
|
class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
|
|
def get_request(self):
|
|
key_file = os.path.join(TEST_VAR_DIR, 'privatekey.key')
|
|
cert_file = os.path.join(TEST_VAR_DIR, 'certificate.crt')
|
|
cacert = os.path.join(TEST_VAR_DIR, 'ca.crt')
|
|
(_sock, addr) = socketserver.TCPServer.get_request(self)
|
|
sock = ssl.wrap_socket(_sock,
|
|
certfile=cert_file,
|
|
keyfile=key_file,
|
|
ca_certs=cacert,
|
|
server_side=True,
|
|
cert_reqs=ssl.CERT_REQUIRED)
|
|
return sock, addr
|
|
|
|
|
|
class TestHTTPSVerifyCert(testtools.TestCase):
|
|
"""Check 'requests' based ssl verification occurs.
|
|
|
|
The requests library performs SSL certificate validation,
|
|
however there is still a need to check that the glare
|
|
client is properly integrated with requests so that
|
|
cert validation actually happens.
|
|
"""
|
|
def setUp(self):
|
|
# Rather than spinning up a new process, we create
|
|
# a thread to perform client/server interaction.
|
|
# This should run more quickly.
|
|
super(TestHTTPSVerifyCert, self).setUp()
|
|
server = ThreadedTCPServer(('127.0.0.1', 0),
|
|
ThreadedTCPRequestHandler)
|
|
__, self.port = server.server_address
|
|
server_thread = threading.Thread(target=server.serve_forever)
|
|
server_thread.daemon = True
|
|
server_thread.start()
|
|
|
|
@mock.patch('sys.stderr')
|
|
def test_v1_requests_cert_verification(self, __):
|
|
"""v1 regression test for bug 115260."""
|
|
port = self.port
|
|
url = 'https://0.0.0.0:%d' % port
|
|
|
|
try:
|
|
client = v1.Client(url,
|
|
insecure=False,
|
|
ssl_compression=True)
|
|
client.artifacts.get('123', type_name='sample_artifact')
|
|
self.fail('No SSL exception has been raised')
|
|
except exc.CommunicationError as e:
|
|
if 'certificate verify failed' not in e.message:
|
|
self.fail('No certificate failure message is received')
|
|
except Exception:
|
|
self.fail('Unexpected exception has been raised')
|
|
|
|
@mock.patch('sys.stderr')
|
|
def test_v1_requests_cert_verification_no_compression(self, __):
|
|
"""v1 regression test for bug 115260."""
|
|
# Legacy test. Verify 'no compression' has no effect
|
|
port = self.port
|
|
url = 'https://0.0.0.0:%d' % port
|
|
|
|
try:
|
|
client = v1.Client(url,
|
|
insecure=False,
|
|
ssl_compression=False)
|
|
client.artifacts.get('123', type_name='sample_artifact')
|
|
self.fail('No SSL exception has been raised')
|
|
except exc.CommunicationError as e:
|
|
if 'certificate verify failed' not in e.message:
|
|
self.fail('No certificate failure message is received')
|
|
except Exception as e:
|
|
self.fail('Unexpected exception has been raised')
|
|
|
|
@mock.patch('sys.stderr')
|
|
def test_v1_requests_valid_cert_verification(self, __):
|
|
"""Test absence of SSL key file."""
|
|
port = self.port
|
|
url = 'https://0.0.0.0:%d' % port
|
|
cacert = os.path.join(TEST_VAR_DIR, 'ca.crt')
|
|
|
|
try:
|
|
gc = Client('1', url,
|
|
insecure=False,
|
|
ssl_compression=True,
|
|
cacert=cacert)
|
|
gc.artifacts.get('123', type_name='sample_artifact')
|
|
except exc.CommunicationError as e:
|
|
if 'certificate verify failed' in e.message:
|
|
self.fail('Certificate failure message is received')
|
|
except Exception as e:
|
|
self.fail('Unexpected exception has been raised')
|
|
|
|
@mock.patch('sys.stderr')
|
|
def test_v1_requests_valid_cert_verification_no_compression(self, __):
|
|
"""Test VerifiedHTTPSConnection: absence of SSL key file."""
|
|
port = self.port
|
|
url = 'https://0.0.0.0:%d' % port
|
|
cacert = os.path.join(TEST_VAR_DIR, 'ca.crt')
|
|
|
|
try:
|
|
gc = Client('1', url,
|
|
insecure=False,
|
|
ssl_compression=False,
|
|
cacert=cacert)
|
|
gc.artifacts.get('123', type_name='sample_artifact')
|
|
except exc.CommunicationError as e:
|
|
if 'certificate verify failed' in e.message:
|
|
self.fail('Certificate failure message is received')
|
|
except Exception as e:
|
|
self.fail('Unexpected exception has been raised')
|
|
|
|
@mock.patch('sys.stderr')
|
|
def test_v1_requests_valid_cert_no_key(self, __):
|
|
"""Test VerifiedHTTPSConnection: absence of SSL key file."""
|
|
port = self.port
|
|
url = 'https://0.0.0.0:%d' % port
|
|
cert_file = os.path.join(TEST_VAR_DIR, 'certificate.crt')
|
|
cacert = os.path.join(TEST_VAR_DIR, 'ca.crt')
|
|
|
|
try:
|
|
gc = Client('1', url,
|
|
insecure=False,
|
|
ssl_compression=False,
|
|
cert_file=cert_file,
|
|
cacert=cacert)
|
|
gc.artifacts.get('123', type_name='sample_artifact')
|
|
except exc.CommunicationError as e:
|
|
if ('PEM lib' not in e.message):
|
|
self.fail('No appropriate failure message is received')
|
|
except Exception as e:
|
|
self.fail('Unexpected exception has been raised')
|
|
|
|
@mock.patch('sys.stderr')
|
|
def test_v1_requests_bad_cert(self, __):
|
|
"""Test VerifiedHTTPSConnection: absence of SSL key file."""
|
|
port = self.port
|
|
url = 'https://0.0.0.0:%d' % port
|
|
cert_file = os.path.join(TEST_VAR_DIR, 'badcert.crt')
|
|
cacert = os.path.join(TEST_VAR_DIR, 'ca.crt')
|
|
|
|
try:
|
|
gc = Client('1', url,
|
|
insecure=False,
|
|
ssl_compression=False,
|
|
cert_file=cert_file,
|
|
cacert=cacert)
|
|
gc.artifacts.get('123', type_name='sample_artifact')
|
|
except exc.CommunicationError as e:
|
|
# NOTE(dsariel)
|
|
# starting from python 2.7.8 the way to handle loading private
|
|
# keys into the SSL_CTX was changed and error message become
|
|
# similar to the one in 3.X
|
|
if (six.PY2 and 'PrivateKey' not in e.message and
|
|
'PEM lib' not in e.message or
|
|
six.PY3 and 'PEM lib' not in e.message):
|
|
self.fail('No appropriate failure message is received')
|
|
except Exception as e:
|
|
self.fail('Unexpected exception has been raised')
|
|
|
|
@mock.patch('sys.stderr')
|
|
def test_v1_requests_bad_ca(self, __):
|
|
"""Test VerifiedHTTPSConnection: absence of SSL key file."""
|
|
port = self.port
|
|
url = 'https://0.0.0.0:%d' % port
|
|
cacert = os.path.join(TEST_VAR_DIR, 'badca.crt')
|
|
|
|
try:
|
|
gc = Client('1', url,
|
|
insecure=False,
|
|
ssl_compression=False,
|
|
cacert=cacert)
|
|
gc.artifacts.get('123', type_name='sample_artifact')
|
|
except exc.CommunicationError as e:
|
|
# NOTE(dsariel)
|
|
# starting from python 2.7.8 the way of handling x509 certificates
|
|
# was changed (github.com/python/peps/blob/master/pep-0476.txt#L28)
|
|
# and error message become similar to the one in 3.X
|
|
if (six.PY2 and 'certificate' not in e.message and
|
|
'No such file' not in e.message or
|
|
six.PY3 and 'No such file' not in e.message):
|
|
self.fail('No appropriate failure message is received')
|
|
except Exception as e:
|
|
self.fail('Unexpected exception has been raised')
|