
Invalid content type: application/octet-stream. Expected: application/json Change-Id: I50f8142cde85310ea913220a815b667967f9f82b
402 lines
16 KiB
Python
402 lines
16 KiB
Python
# Copyright 2012 OpenStack Foundation
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
import functools
|
|
import json
|
|
|
|
from keystoneauth1 import session
|
|
from keystoneauth1 import token_endpoint
|
|
|
|
import mock
|
|
import requests
|
|
from requests_mock.contrib import fixture
|
|
import six
|
|
from six.moves.urllib import parse
|
|
from testscenarios import load_tests_apply_scenarios as load_tests # noqa
|
|
import testtools
|
|
from testtools import matchers
|
|
import types
|
|
|
|
import glareclient
|
|
from glareclient.common import http
|
|
from glareclient.tests import utils
|
|
|
|
|
|
def original_only(f):
|
|
@functools.wraps(f)
|
|
def wrapper(self, *args, **kwargs):
|
|
if not hasattr(self.client, 'log_curl_request'):
|
|
self.skipTest('Skip logging tests for session client')
|
|
|
|
return f(self, *args, **kwargs)
|
|
|
|
|
|
class TestClient(testtools.TestCase):
|
|
|
|
scenarios = [
|
|
('httpclient', {'create_client': '_create_http_client'}),
|
|
('session', {'create_client': '_create_session_client'})
|
|
]
|
|
|
|
def _create_http_client(self):
|
|
return http.HTTPClient(self.endpoint, token=self.token)
|
|
|
|
def _create_session_client(self):
|
|
auth = token_endpoint.Token(self.endpoint, self.token)
|
|
sess = session.Session(auth=auth)
|
|
return http.SessionClient(sess)
|
|
|
|
def setUp(self):
|
|
super(TestClient, self).setUp()
|
|
self.mock = self.useFixture(fixture.Fixture())
|
|
|
|
self.endpoint = 'http://example.com:9292'
|
|
self.ssl_endpoint = 'https://example.com:9292'
|
|
self.token = u'abc123'
|
|
|
|
self.client = getattr(self, self.create_client)()
|
|
|
|
def test_identity_headers_and_token(self):
|
|
identity_headers = {
|
|
'X-Auth-Token': 'auth_token',
|
|
'X-User-Id': 'user',
|
|
'X-Tenant-Id': 'tenant',
|
|
'X-Roles': 'roles',
|
|
'X-Identity-Status': 'Confirmed',
|
|
'X-Service-Catalog': 'service_catalog',
|
|
}
|
|
# with token
|
|
kwargs = {'token': u'fake-token',
|
|
'identity_headers': identity_headers}
|
|
http_client_object = http.HTTPClient(self.endpoint, **kwargs)
|
|
self.assertEqual('auth_token', http_client_object.auth_token)
|
|
self.assertTrue(http_client_object.identity_headers.
|
|
get('X-Auth-Token') is None)
|
|
|
|
def test_identity_headers_and_no_token_in_header(self):
|
|
identity_headers = {
|
|
'X-User-Id': 'user',
|
|
'X-Tenant-Id': 'tenant',
|
|
'X-Roles': 'roles',
|
|
'X-Identity-Status': 'Confirmed',
|
|
'X-Service-Catalog': 'service_catalog',
|
|
}
|
|
# without X-Auth-Token in identity headers
|
|
kwargs = {'token': u'fake-token',
|
|
'identity_headers': identity_headers}
|
|
http_client_object = http.HTTPClient(self.endpoint, **kwargs)
|
|
self.assertEqual(u'fake-token', http_client_object.auth_token)
|
|
self.assertTrue(http_client_object.identity_headers.
|
|
get('X-Auth-Token') is None)
|
|
|
|
def test_identity_headers_and_no_token_in_session_header(self):
|
|
# Tests that if token or X-Auth-Token are not provided in the kwargs
|
|
# when creating the http client, the session headers don't contain
|
|
# the X-Auth-Token key.
|
|
identity_headers = {
|
|
'X-User-Id': 'user',
|
|
'X-Tenant-Id': 'tenant',
|
|
'X-Roles': 'roles',
|
|
'X-Identity-Status': 'Confirmed',
|
|
'X-Service-Catalog': 'service_catalog',
|
|
}
|
|
kwargs = {'identity_headers': identity_headers}
|
|
http_client_object = http.HTTPClient(self.endpoint, **kwargs)
|
|
self.assertIsNone(http_client_object.auth_token)
|
|
self.assertNotIn('X-Auth-Token', http_client_object.session.headers)
|
|
|
|
def test_identity_headers_are_passed(self):
|
|
# Tests that if token or X-Auth-Token are not provided in the kwargs
|
|
# when creating the http client, the session headers don't contain
|
|
# the X-Auth-Token key.
|
|
identity_headers = {
|
|
'X-User-Id': b'user',
|
|
'X-Tenant-Id': b'tenant',
|
|
'X-Roles': b'roles',
|
|
'X-Identity-Status': b'Confirmed',
|
|
'X-Service-Catalog': b'service_catalog',
|
|
}
|
|
kwargs = {'identity_headers': identity_headers}
|
|
http_client = http.HTTPClient(self.endpoint, **kwargs)
|
|
|
|
path = '/artifactsmy-image'
|
|
self.mock.get(self.endpoint + path)
|
|
http_client.get(path)
|
|
|
|
headers = self.mock.last_request.headers
|
|
for k, v in six.iteritems(identity_headers):
|
|
self.assertEqual(v, headers[k])
|
|
|
|
def test_language_header_passed(self):
|
|
kwargs = {'language_header': 'nb_NO'}
|
|
http_client = http.HTTPClient(self.endpoint, **kwargs)
|
|
|
|
path = '/v2/images/my-image'
|
|
self.mock.get(self.endpoint + path)
|
|
http_client.get(path)
|
|
|
|
headers = self.mock.last_request.headers
|
|
self.assertEqual(kwargs['language_header'], headers['Accept-Language'])
|
|
|
|
def test_language_header_not_passed_no_language(self):
|
|
kwargs = {}
|
|
http_client = http.HTTPClient(self.endpoint, **kwargs)
|
|
|
|
path = '/v2/images/my-image'
|
|
self.mock.get(self.endpoint + path)
|
|
http_client.get(path)
|
|
|
|
headers = self.mock.last_request.headers
|
|
self.assertTrue('Accept-Language' not in headers)
|
|
|
|
def test_connection_timeout(self):
|
|
"""Verify a InvalidEndpoint is received if connection times out."""
|
|
def cb(request, context):
|
|
raise requests.exceptions.Timeout
|
|
|
|
path = '/v1/images'
|
|
self.mock.get(self.endpoint + path, text=cb)
|
|
comm_err = self.assertRaises(glareclient.exc.InvalidEndpoint,
|
|
self.client.get,
|
|
'/v1/images')
|
|
self.assertIn(self.endpoint, comm_err.message)
|
|
|
|
def test_connection_refused(self):
|
|
"""Verify a CommunicationError is received if connection is refused.
|
|
|
|
The error should list the host and port that refused the connection.
|
|
"""
|
|
def cb(request, context):
|
|
raise requests.exceptions.ConnectionError()
|
|
|
|
path = '/artifacts/?limit=20'
|
|
self.mock.get(self.endpoint + path, text=cb)
|
|
|
|
comm_err = self.assertRaises(glareclient.exc.CommunicationError,
|
|
self.client.get,
|
|
'/artifacts/?limit=20')
|
|
|
|
self.assertIn(self.endpoint, comm_err.message)
|
|
|
|
def test_http_encoding(self):
|
|
path = '/artifacts/'
|
|
text = 'Ok'
|
|
self.mock.get(self.endpoint + path, text=text,
|
|
headers={"Content-Type": "text/plain"})
|
|
|
|
headers = {"test": u'ni\xf1o'}
|
|
resp, body = self.client.get(path, headers=headers)
|
|
self.assertEqual(text, resp.text)
|
|
|
|
def test_request_id(self):
|
|
path = '/artifacts/'
|
|
self.mock.get(self.endpoint + path,
|
|
headers={"x-openstack-request-id": "req-aaa"})
|
|
|
|
self.client.get(path)
|
|
self.assertEqual(self.client.last_request_id, 'req-aaa')
|
|
|
|
def test_headers_encoding(self):
|
|
value = u'ni\xf1o'
|
|
headers = {"test": value, "none-val": None}
|
|
encoded = http.encode_headers(headers)
|
|
self.assertEqual(b"ni\xc3\xb1o", encoded[b"test"])
|
|
self.assertNotIn("none-val", encoded)
|
|
|
|
def test_raw_request(self):
|
|
"""Verify the path being used for HTTP requests reflects accurately."""
|
|
headers = {"Content-Type": "text/plain"}
|
|
text = 'Ok'
|
|
path = '/artifacts/'
|
|
|
|
self.mock.get(self.endpoint + path, text=text, headers=headers)
|
|
|
|
resp, body = self.client.get('/artifacts/', headers=headers)
|
|
self.assertEqual(headers, resp.headers)
|
|
self.assertEqual(text, resp.text)
|
|
|
|
def test_parse_endpoint(self):
|
|
endpoint = 'http://example.com:9292'
|
|
test_client = http.HTTPClient(endpoint, token=u'adc123')
|
|
actual = test_client.parse_endpoint(endpoint)
|
|
expected = parse.SplitResult(scheme='http',
|
|
netloc='example.com:9292', path='',
|
|
query='', fragment='')
|
|
self.assertEqual(expected, actual)
|
|
|
|
def test_get_connections_kwargs_http(self):
|
|
endpoint = 'http://example.com:9292'
|
|
test_client = http.HTTPClient(endpoint, token=u'adc123')
|
|
self.assertEqual(600.0, test_client.timeout)
|
|
|
|
def test__chunk_body_exact_size_chunk(self):
|
|
test_client = http._BaseHTTPClient()
|
|
bytestring = b'x' * http.CHUNKSIZE
|
|
data = six.BytesIO(bytestring)
|
|
chunk = list(test_client._chunk_body(data))
|
|
self.assertEqual(1, len(chunk))
|
|
self.assertEqual([bytestring], chunk)
|
|
|
|
def test_http_chunked_request(self):
|
|
text = "Ok"
|
|
data = six.StringIO(text)
|
|
path = '/artifacts'
|
|
self.mock.post(self.endpoint + path, text=text)
|
|
|
|
headers = {"test": u'chunked_request'}
|
|
resp, body = self.client.post(path, headers=headers, data=data)
|
|
self.assertIsInstance(self.mock.last_request.body, types.GeneratorType)
|
|
self.assertEqual(text, resp.text)
|
|
|
|
def test_http_json(self):
|
|
data = {"test": "json_request"}
|
|
path = '/artifacts'
|
|
text = 'OK'
|
|
self.mock.post(self.endpoint + path, text=text)
|
|
|
|
headers = {"test": u'chunked_request'}
|
|
resp, body = self.client.post(path, headers=headers, data=data)
|
|
|
|
self.assertEqual(text, resp.text)
|
|
self.assertIsInstance(self.mock.last_request.body, six.string_types)
|
|
self.assertEqual(data, json.loads(self.mock.last_request.body))
|
|
|
|
@original_only
|
|
def test_log_http_response_with_non_ascii_char(self):
|
|
try:
|
|
response = 'Ok'
|
|
headers = {"Content-Type": "text/plain",
|
|
"test": "value1\xa5\xa6"}
|
|
fake = utils.FakeResponse(headers, six.StringIO(response))
|
|
self.client.log_http_response(fake)
|
|
except UnicodeDecodeError as e:
|
|
self.fail("Unexpected UnicodeDecodeError exception '%s'" % e)
|
|
|
|
@original_only
|
|
def test_log_curl_request_with_non_ascii_char(self):
|
|
try:
|
|
headers = {'header1': 'value1\xa5\xa6'}
|
|
body = 'examplebody\xa5\xa6'
|
|
self.client.log_curl_request('GET', '/api/v1/\xa5', headers, body,
|
|
None)
|
|
except UnicodeDecodeError as e:
|
|
self.fail("Unexpected UnicodeDecodeError exception '%s'" % e)
|
|
|
|
@original_only
|
|
@mock.patch('glareclient.common.http.LOG.debug')
|
|
def test_log_curl_request_with_body_and_header(self, mock_log):
|
|
hd_name = 'header1'
|
|
hd_val = 'value1'
|
|
headers = {hd_name: hd_val}
|
|
body = 'examplebody'
|
|
self.client.log_curl_request('GET', '/api/v1/', headers, body, None)
|
|
self.assertTrue(mock_log.called, 'LOG.debug never called')
|
|
self.assertTrue(mock_log.call_args[0],
|
|
'LOG.debug called with no arguments')
|
|
hd_regex = ".*\s-H\s+'\s*%s\s*:\s*%s\s*'.*" % (hd_name, hd_val)
|
|
self.assertThat(mock_log.call_args[0][0],
|
|
matchers.MatchesRegex(hd_regex),
|
|
'header not found in curl command')
|
|
body_regex = ".*\s-d\s+'%s'\s.*" % body
|
|
self.assertThat(mock_log.call_args[0][0],
|
|
matchers.MatchesRegex(body_regex),
|
|
'body not found in curl command')
|
|
|
|
def _test_log_curl_request_with_certs(self, mock_log, key, cert, cacert):
|
|
headers = {'header1': 'value1'}
|
|
http_client_object = http.HTTPClient(self.ssl_endpoint, key_file=key,
|
|
cert_file=cert, cacert=cacert,
|
|
token='fake-token')
|
|
http_client_object.log_curl_request('GET', '/api/v1/', headers, None,
|
|
None)
|
|
self.assertTrue(mock_log.called, 'LOG.debug never called')
|
|
self.assertTrue(mock_log.call_args[0],
|
|
'LOG.debug called with no arguments')
|
|
|
|
needles = {'key': key, 'cert': cert, 'cacert': cacert}
|
|
for option, value in six.iteritems(needles):
|
|
if value:
|
|
regex = ".*\s--%s\s+('%s'|%s).*" % (option, value, value)
|
|
self.assertThat(mock_log.call_args[0][0],
|
|
matchers.MatchesRegex(regex),
|
|
'no --%s option in curl command' % option)
|
|
else:
|
|
regex = ".*\s--%s\s+.*" % option
|
|
self.assertThat(mock_log.call_args[0][0],
|
|
matchers.Not(matchers.MatchesRegex(regex)),
|
|
'unexpected --%s option in curl command' %
|
|
option)
|
|
|
|
@mock.patch('glareclient.common.http.LOG.debug')
|
|
def test_log_curl_request_with_all_certs(self, mock_log):
|
|
self._test_log_curl_request_with_certs(mock_log, 'key1', 'cert1',
|
|
'cacert2')
|
|
|
|
@mock.patch('glareclient.common.http.LOG.debug')
|
|
def test_log_curl_request_with_some_certs(self, mock_log):
|
|
self._test_log_curl_request_with_certs(mock_log, 'key1', 'cert1', None)
|
|
|
|
@mock.patch('glareclient.common.http.LOG.debug')
|
|
def test_log_curl_request_with_insecure_param(self, mock_log):
|
|
headers = {'header1': 'value1'}
|
|
http_client_object = http.HTTPClient(self.ssl_endpoint, insecure=True,
|
|
token='fake-token')
|
|
http_client_object.log_curl_request('GET', '/api/v1/', headers, None,
|
|
None)
|
|
self.assertTrue(mock_log.called, 'LOG.debug never called')
|
|
self.assertTrue(mock_log.call_args[0],
|
|
'LOG.debug called with no arguments')
|
|
self.assertThat(mock_log.call_args[0][0],
|
|
matchers.MatchesRegex('.*\s-k\s.*'),
|
|
'no -k option in curl command')
|
|
|
|
@mock.patch('glareclient.common.http.LOG.debug')
|
|
def test_log_curl_request_with_token_header(self, mock_log):
|
|
fake_token = 'fake-token'
|
|
headers = {'X-Auth-Token': fake_token}
|
|
http_client_object = http.HTTPClient(self.endpoint,
|
|
identity_headers=headers)
|
|
http_client_object.log_curl_request('GET', '/api/v1/', headers, None,
|
|
None)
|
|
self.assertTrue(mock_log.called, 'LOG.debug never called')
|
|
self.assertTrue(mock_log.call_args[0],
|
|
'LOG.debug called with no arguments')
|
|
token_regex = '.*%s.*' % fake_token
|
|
self.assertThat(mock_log.call_args[0][0],
|
|
matchers.Not(matchers.MatchesRegex(token_regex)),
|
|
'token found in LOG.debug parameter')
|
|
|
|
def test_expired_token_has_changed(self):
|
|
# instantiate client with some token
|
|
fake_token = b'fake-token'
|
|
http_client = http.HTTPClient(self.endpoint,
|
|
token=fake_token)
|
|
path = '/artifacts'
|
|
self.mock.get(self.endpoint + path)
|
|
http_client.get(path)
|
|
headers = self.mock.last_request.headers
|
|
self.assertEqual(fake_token, headers['X-Auth-Token'])
|
|
# refresh the token
|
|
refreshed_token = b'refreshed-token'
|
|
http_client.auth_token = refreshed_token
|
|
http_client.get(path)
|
|
headers = self.mock.last_request.headers
|
|
self.assertEqual(refreshed_token, headers['X-Auth-Token'])
|
|
# regression check for bug 1448080
|
|
unicode_token = u'ni\xf1o'
|
|
http_client.auth_token = unicode_token
|
|
http_client.get(path)
|
|
headers = self.mock.last_request.headers
|
|
self.assertEqual(b'ni\xc3\xb1o', headers['X-Auth-Token'])
|