Initial tests

This commit is contained in:
lvdongbing 2016-04-20 21:25:31 -04:00
parent ccb01f95d5
commit 1d3450c8cf
20 changed files with 2553 additions and 833 deletions

4
.testr.conf Normal file
View File

@ -0,0 +1,4 @@
[DEFAULT]
test_command=${PYTHON:-python} -m subunit.run discover -t ./ ${OS_TEST_PATH:-./bileanclient/tests/unit} $LISTOPT $IDOPTION
test_id_option=--load-list $IDFILE
test_list_option=--list

View File

@ -17,14 +17,14 @@
import pbr.version
from bileanclient import client
from bileanclient.client import Client
from bileanclient import exc as exceptions
__version__ = pbr.version.VersionInfo('python-bileanclient').version_string()
__all__ = [
'client',
'Client',
'exc',
'exceptions',
]

45
bileanclient/_i18n.py Normal file
View File

@ -0,0 +1,45 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""oslo.i18n integration module.
See http://docs.openstack.org/developer/oslo.i18n/usage.html
"""
try:
import oslo_i18n
# NOTE(dhellmann): This reference to o-s-l-o will be replaced by the
# application name when this module is synced into the separate
# repository. It is OK to have more than one translation function
# using the same domain, since there will still only be one message
# catalog.
_translators = oslo_i18n.TranslatorFactory(domain='bileanclient')
# The primary translation function using the well-known name "_"
_ = _translators.primary
# Translators for log levels.
#
# The abbreviated names are meant to reflect the usual use of a short
# name like '_'. The "L" is for "log" and the other letter comes from
# the level.
_LI = _translators.log_info
_LW = _translators.log_warning
_LE = _translators.log_error
_LC = _translators.log_critical
except ImportError:
# NOTE(dims): Support for cases where a project wants to use
# code from oslo-incubator, but is not ready to be internationalized
# (like tempest)
_ = _LI = _LW = _LE = _LC = lambda x: x

View File

@ -10,10 +10,47 @@
# License for the specific language governing permissions and limitations
# under the License.
import warnings
from bileanclient.common import utils
def Client(version, *args, **kwargs):
module = utils.import_versioned_module(version, 'client')
def Client(version=None, endpoint=None, session=None, *args, **kwargs):
"""Client for the OpenStack Billing API.
Generic client for the OpenStack Billing API. See version classes
for specific details.
:param string version: The version of API to use.
:param session: A keystoneclient session that should be used for transport.
:type session: keystoneclient.session.Session
"""
if session:
if endpoint:
kwargs.setdefault('endpoint_override', endpoint)
if not version:
__, version = utils.strip_version(endpoint)
if not version:
msg = ("You must provide a client version when using session")
raise RuntimeError(msg)
else:
if version is not None:
warnings.warn(("`version` keyword is being deprecated. Please pass"
" the version as part of the URL. "
"http://$HOST:$PORT/v$VERSION_NUMBER"),
DeprecationWarning)
endpoint, url_version = utils.strip_version(endpoint)
version = version or url_version
if not version:
msg = ("Please provide either the version or an url with the form "
"http://$HOST:$PORT/v$VERSION_NUMBER")
raise RuntimeError(msg)
module = utils.import_versioned_module(int(version), 'client')
client_class = getattr(module, 'Client')
return client_class(*args, **kwargs)
return client_class(endpoint, *args, session=session, **kwargs)

View File

@ -14,335 +14,329 @@
# under the License.
import copy
import hashlib
import logging
import os
import socket
from oslo_serialization import jsonutils
from oslo_utils import encodeutils
from keystoneclient import adapter
from keystoneclient import exceptions as ksc_exc
from oslo_utils import importutils
from oslo_utils import netutils
import requests
import six
from six.moves.urllib import parse
import warnings
try:
import json
except ImportError:
import simplejson as json
from oslo_utils import encodeutils
from bileanclient.common import utils
from bileanclient import exc
from bileanclient.openstack.common._i18n import _
from bileanclient.openstack.common._i18n import _LW
from keystoneclient import adapter
osprofiler_web = importutils.try_import("osprofiler.web")
LOG = logging.getLogger(__name__)
USER_AGENT = 'python-bileanclient'
CHUNKSIZE = 1024 * 64 # 64kB
SENSITIVE_HEADERS = ('X-Auth-Token',)
osprofiler_web = importutils.try_import("osprofiler.web")
def get_system_ca_file():
"""Return path to system default CA file."""
# Standard CA file locations for Debian/Ubuntu, RedHat/Fedora,
# Suse, FreeBSD/OpenBSD, MacOSX, and the bundled ca
ca_path = ['/etc/ssl/certs/ca-certificates.crt',
'/etc/pki/tls/certs/ca-bundle.crt',
'/etc/ssl/ca-bundle.pem',
'/etc/ssl/cert.pem',
'/System/Library/OpenSSL/certs/cacert.pem',
requests.certs.where()]
for ca in ca_path:
LOG.debug("Looking for ca file %s", ca)
if os.path.exists(ca):
LOG.debug("Using ca file %s", ca)
return ca
LOG.warning(_LW("System ca file could not be found."))
class _BaseHTTPClient(object):
@staticmethod
def _chunk_body(body):
chunk = body
while chunk:
chunk = body.read(CHUNKSIZE)
if chunk == '':
break
yield chunk
def _set_common_request_kwargs(self, headers, kwargs):
"""Handle the common parameters used to send the request."""
# Default Content-Type is octet-stream
content_type = headers.get('Content-Type', 'application/octet-stream')
# NOTE(jamielennox): remove this later. Managers should pass json= if
# they want to send json data.
data = kwargs.pop("data", None)
if data is not None and not isinstance(data, six.string_types):
try:
data = json.dumps(data)
content_type = 'application/json'
except TypeError:
# Here we assume it's
# a file-like object
# and we'll chunk it
data = self._chunk_body(data)
headers['Content-Type'] = content_type
kwargs['stream'] = content_type == 'application/octet-stream'
return data
def _handle_response(self, resp):
if not resp.ok:
LOG.debug("Request returned failure status %s." % resp.status_code)
raise exc.from_response(resp)
elif (resp.status_code == requests.codes.MULTIPLE_CHOICES and
resp.request.path_url != '/versions'):
# NOTE(flaper87): Eventually, we'll remove the check on `versions`
# which is a bug (1491350) on the server.
raise exc.from_response(resp)
content_type = resp.headers.get('Content-Type')
# Read body into string if it isn't obviously image data
if content_type == 'application/octet-stream':
# Do not read all response in memory when downloading an image.
body_iter = _close_after_stream(resp, CHUNKSIZE)
else:
content = resp.text
if content_type and content_type.startswith('application/json'):
# Let's use requests json method, it should take care of
# response encoding
body_iter = resp.json()
else:
body_iter = six.StringIO(content)
try:
body_iter = json.loads(''.join([c for c in body_iter]))
except ValueError:
body_iter = None
return resp, body_iter
class HTTPClient(object):
class HTTPClient(_BaseHTTPClient):
def __init__(self, endpoint, **kwargs):
self.endpoint = endpoint
self.auth_url = kwargs.get('auth_url')
self.identity_headers = kwargs.get('identity_headers')
self.auth_token = kwargs.get('token')
self.username = kwargs.get('username')
self.password = kwargs.get('password')
self.region_name = kwargs.get('region_name')
self.include_pass = kwargs.get('include_pass')
self.endpoint_url = endpoint
self.language_header = kwargs.get('language_header')
if self.identity_headers:
if self.identity_headers.get('X-Auth-Token'):
self.auth_token = self.identity_headers.get('X-Auth-Token')
del self.identity_headers['X-Auth-Token']
self.cert_file = kwargs.get('cert_file')
self.key_file = kwargs.get('key_file')
self.timeout = kwargs.get('timeout')
self.session = requests.Session()
self.session.headers["User-Agent"] = USER_AGENT
self.ssl_connection_params = {
'ca_file': kwargs.get('ca_file'),
'cert_file': kwargs.get('cert_file'),
'key_file': kwargs.get('key_file'),
'insecure': kwargs.get('insecure'),
}
if self.language_header:
self.session.headers["Accept-Language"] = self.language_header
self.verify_cert = None
if parse.urlparse(endpoint).scheme == "https":
if kwargs.get('insecure'):
self.verify_cert = False
self.timeout = float(kwargs.get('timeout', 600))
if self.endpoint.startswith("https"):
compression = kwargs.get('ssl_compression', True)
if compression is False:
# Note: This is not seen by default. (python must be
# run with -Wd)
warnings.warn('The "ssl_compression" argument has been '
'deprecated.', DeprecationWarning)
if kwargs.get('insecure', False) is True:
self.session.verify = False
else:
self.verify_cert = kwargs.get('ca_file', get_system_ca_file())
if kwargs.get('cacert', None) is not '':
self.session.verify = kwargs.get('cacert', True)
# FIXME(shardy): We need this for compatibility with the oslo apiclient
# we should move to inheriting this class from the oslo HTTPClient
self.last_request_id = None
self.session.cert = (kwargs.get('cert_file'),
kwargs.get('key_file'))
def safe_header(self, name, value):
if name in SENSITIVE_HEADERS:
# because in python3 byte string handling is ... ug
v = value.encode('utf-8')
h = hashlib.sha1(v)
d = h.hexdigest()
return encodeutils.safe_decode(name), "{SHA1}%s" % d
else:
return (encodeutils.safe_decode(name),
encodeutils.safe_decode(value))
@staticmethod
def parse_endpoint(endpoint):
return netutils.urlsplit(endpoint)
def log_curl_request(self, method, url, kwargs):
def log_curl_request(self, method, url, headers, data, kwargs):
curl = ['curl -g -i -X %s' % method]
for (key, value) in kwargs['headers'].items():
header = '-H \'%s: %s\'' % self.safe_header(key, value)
headers = copy.deepcopy(headers)
headers.update(self.session.headers)
for (key, value) in six.iteritems(headers):
header = '-H \'%s: %s\'' % utils.safe_header(key, value)
curl.append(header)
conn_params_fmt = [
('key_file', '--key %s'),
('cert_file', '--cert %s'),
('ca_file', '--cacert %s'),
]
for (key, fmt) in conn_params_fmt:
value = self.ssl_connection_params.get(key)
if value:
curl.append(fmt % value)
if self.ssl_connection_params.get('insecure'):
if not self.session.verify:
curl.append('-k')
else:
if isinstance(self.session.verify, six.string_types):
curl.append(' --cacert %s' % self.session.verify)
if 'data' in kwargs:
curl.append('-d \'%s\'' % kwargs['data'])
if self.session.cert:
curl.append(' --cert %s --key %s' % self.session.cert)
curl.append('%s%s' % (self.endpoint, url))
LOG.debug(' '.join(curl))
if data and isinstance(data, six.string_types):
curl.append('-d \'%s\'' % data)
curl.append(url)
msg = ' '.join([encodeutils.safe_decode(item, errors='ignore')
for item in curl])
LOG.debug(msg)
@staticmethod
def log_http_response(resp):
status = (resp.raw.version / 10.0, resp.status_code, resp.reason)
dump = ['\nHTTP/%.1f %s %s' % status]
dump.extend(['%s: %s' % (k, v) for k, v in resp.headers.items()])
headers = resp.headers.items()
dump.extend(['%s: %s' % utils.safe_header(k, v) for k, v in headers])
dump.append('')
if resp.content:
content = resp.content
if isinstance(content, six.binary_type):
content = content.decode()
dump.extend([content, ''])
LOG.debug('\n'.join(dump))
content_type = resp.headers.get('Content-Type')
def _http_request(self, url, method, **kwargs):
if content_type != 'application/octet-stream':
dump.extend([resp.text, ''])
LOG.debug('\n'.join([encodeutils.safe_decode(x, errors='ignore')
for x in dump]))
@staticmethod
def encode_headers(headers):
"""Encodes headers.
Note: This should be used right before
sending anything out.
:param headers: Headers to encode
:returns: Dictionary with encoded headers'
names and values
"""
return dict((encodeutils.safe_encode(h), encodeutils.safe_encode(v))
for h, v in six.iteritems(headers) if v is not None)
def _request(self, method, url, **kwargs):
"""Send an http request with the specified characteristics.
Wrapper around requests.request to handle tasks such as
setting headers and error handling.
Wrapper around httplib.HTTP(S)Connection.request to handle tasks such
as setting headers and error handling.
"""
# Copy the kwargs so we can reuse the original in case of redirects
kwargs['headers'] = copy.deepcopy(kwargs.get('headers', {}))
kwargs['headers'].setdefault('User-Agent', USER_AGENT)
if self.auth_token:
kwargs['headers'].setdefault('X-Auth-Token', self.auth_token)
else:
kwargs['headers'].update(self.credentials_headers())
if self.auth_url:
kwargs['headers'].setdefault('X-Auth-Url', self.auth_url)
if self.region_name:
kwargs['headers'].setdefault('X-Region-Name', self.region_name)
if self.include_pass and 'X-Auth-Key' not in kwargs['headers']:
kwargs['headers'].update(self.credentials_headers())
headers = copy.deepcopy(kwargs.pop('headers', {}))
if self.identity_headers:
for k, v in six.iteritems(self.identity_headers):
headers.setdefault(k, v)
data = self._set_common_request_kwargs(headers, kwargs)
# add identity header to the request
if not headers.get('X-Auth-Token'):
headers['X-Auth-Token'] = self.auth_token
if osprofiler_web:
kwargs['headers'].update(osprofiler_web.get_trace_id_headers())
headers.update(osprofiler_web.get_trace_id_headers())
self.log_curl_request(method, url, kwargs)
# Note(flaper87): Before letting headers / url fly,
# they should be encoded otherwise httplib will
# complain.
headers = self.encode_headers(headers)
if self.cert_file and self.key_file:
kwargs['cert'] = (self.cert_file, self.key_file)
if self.verify_cert is not None:
kwargs['verify'] = self.verify_cert
if self.timeout is not None:
kwargs['timeout'] = float(self.timeout)
# Allow caller to specify not to follow redirects, in which case we
# just return the redirect response. Useful for using stacks:lookup.
redirect = kwargs.pop('redirect', True)
# Since requests does not follow the RFC when doing redirection to sent
# back the same method on a redirect we are simply bypassing it. For
# example if we do a DELETE/POST/PUT on a URL and we get a 302 RFC says
# that we should follow that URL with the same method as before,
# requests doesn't follow that and send a GET instead for the method.
# Hopefully this could be fixed as they say in a comment in a future
# point version i.e.: 3.x
# See issue: https://github.com/kennethreitz/requests/issues/1704
allow_redirects = False
if self.endpoint.endswith("/") or url.startswith("/"):
conn_url = "%s%s" % (self.endpoint, url)
else:
conn_url = "%s/%s" % (self.endpoint, url)
self.log_curl_request(method, conn_url, headers, data, kwargs)
try:
resp = requests.request(
method,
self.endpoint_url + url,
allow_redirects=allow_redirects,
**kwargs)
resp = self.session.request(method,
conn_url,
data=data,
headers=headers,
**kwargs)
except requests.exceptions.Timeout as e:
message = ("Error communicating with %(url)s: %(e)s" %
dict(url=conn_url, e=e))
raise exc.InvalidEndpoint(message=message)
except requests.exceptions.ConnectionError as e:
message = ("Error finding address for %(url)s: %(e)s" %
dict(url=conn_url, e=e))
raise exc.CommunicationError(message=message)
except socket.gaierror as e:
message = (_("Error finding address for %(url)s: %(e)s") %
{'url': self.endpoint_url + url, 'e': e})
message = "Error finding address for %s: %s" % (
self.endpoint_hostname, e)
raise exc.InvalidEndpoint(message=message)
except (socket.error, socket.timeout) as e:
endpoint = self.endpoint
message = (_("Error communicating with %(endpoint)s %(e)s") %
message = ("Error communicating with %(endpoint)s %(e)s" %
{'endpoint': endpoint, 'e': e})
raise exc.CommunicationError(message=message)
resp, body_iter = self._handle_response(resp)
self.log_http_response(resp)
if not ('X-Auth-Key' in kwargs['headers']) and (
resp.status_code == 401 or
(resp.status_code == 500 and "(HTTP 401)" in resp.content)):
raise exc.HTTPUnauthorized("Authentication failed")
elif 400 <= resp.status_code < 600:
raise exc.from_response(resp)
elif resp.status_code in (301, 302, 305):
# Redirected. Reissue the request to the new location,
# unless caller specified redirect=False
if redirect:
location = resp.headers.get('location')
path = self.strip_endpoint(location)
resp = self._http_request(path, method, **kwargs)
elif resp.status_code == 300:
raise exc.from_response(resp)
return resp
def strip_endpoint(self, location):
if location is None:
message = _("Location not returned with 302")
raise exc.InvalidEndpoint(message=message)
elif location.lower().startswith(self.endpoint.lower()):
return location[len(self.endpoint):]
else:
message = _("Prohibited endpoint redirect %s") % location
raise exc.InvalidEndpoint(message=message)
def credentials_headers(self):
creds = {}
# NOTE(dhu): (shardy) When deferred_auth_method=password, Heat
# encrypts and stores username/password. For Keystone v3, the
# intent is to use trusts since SHARDY is working towards
# deferred_auth_method=trusts as the default.
# TODO(dhu): Make Keystone v3 work in Heat standalone mode. Maye
# require X-Auth-User-Domain.
if self.username:
creds['X-Auth-User'] = self.username
if self.password:
creds['X-Auth-Key'] = self.password
return creds
def json_request(self, method, url, **kwargs):
kwargs.setdefault('headers', {})
kwargs['headers'].setdefault('Content-Type', 'application/json')
kwargs['headers'].setdefault('Accept', 'application/json')
if 'data' in kwargs:
kwargs['data'] = jsonutils.dumps(kwargs['data'])
resp = self._http_request(url, method, **kwargs)
body = utils.get_response_body(resp)
return resp, body
def raw_request(self, method, url, **kwargs):
kwargs.setdefault('headers', {})
kwargs['headers'].setdefault('Content-Type',
'application/octet-stream')
return self._http_request(url, method, **kwargs)
def client_request(self, method, url, **kwargs):
resp, body = self.json_request(method, url, **kwargs)
return resp
return resp, body_iter
def head(self, url, **kwargs):
return self.client_request("HEAD", url, **kwargs)
return self._request('HEAD', url, **kwargs)
def get(self, url, **kwargs):
return self.client_request("GET", url, **kwargs)
return self._request('GET', url, **kwargs)
def post(self, url, **kwargs):
return self.client_request("POST", url, **kwargs)
return self._request('POST', url, **kwargs)
def put(self, url, **kwargs):
return self.client_request("PUT", url, **kwargs)
def delete(self, url, **kwargs):
return self.raw_request("DELETE", url, **kwargs)
return self._request('PUT', url, **kwargs)
def patch(self, url, **kwargs):
return self.client_request("PATCH", url, **kwargs)
return self._request('PATCH', url, **kwargs)
def delete(self, url, **kwargs):
return self._request('DELETE', url, **kwargs)
class SessionClient(adapter.LegacyJsonAdapter):
"""HTTP client based on Keystone client session."""
def _close_after_stream(response, chunk_size):
"""Iterate over the content and ensure the response is closed after."""
# Yield each chunk in the response body
for chunk in response.iter_content(chunk_size=chunk_size):
yield chunk
# Once we're done streaming the body, ensure everything is closed.
# This will return the connection to the HTTPConnectionPool in urllib3
# and ideally reduce the number of HTTPConnectionPool full warnings.
response.close()
class SessionClient(adapter.Adapter, _BaseHTTPClient):
def __init__(self, session, **kwargs):
kwargs.setdefault('user_agent', USER_AGENT)
kwargs.setdefault('service_type', 'billing')
super(SessionClient, self).__init__(session, **kwargs)
def request(self, url, method, **kwargs):
redirect = kwargs.get('redirect')
kwargs.setdefault('user_agent', USER_AGENT)
headers = kwargs.pop('headers', {})
kwargs['raise_exc'] = False
data = self._set_common_request_kwargs(headers, kwargs)
try:
kwargs.setdefault('json', kwargs.pop('data'))
except KeyError:
pass
resp, body = super(SessionClient, self).request(
url, method,
raise_exc=False,
**kwargs)
if 400 <= resp.status_code < 600:
raise exc.from_response(resp)
elif resp.status_code in (301, 302, 305):
if redirect:
location = resp.headers.get('location')
path = self.strip_endpoint(location)
resp = self.request(path, method, **kwargs)
elif resp.status_code == 300:
raise exc.from_response(resp)
return resp
def credentials_headers(self):
return {}
def strip_endpoint(self, location):
if location is None:
message = _("Location not returned with 302")
resp = super(SessionClient, self).request(url,
method,
headers=headers,
data=data,
**kwargs)
except ksc_exc.RequestTimeout as e:
conn_url = self.get_endpoint(auth=kwargs.get('auth'))
conn_url = "%s/%s" % (conn_url.rstrip('/'), url.lstrip('/'))
message = ("Error communicating with %(url)s %(e)s" %
dict(url=conn_url, e=e))
raise exc.InvalidEndpoint(message=message)
if (self.endpoint_override is not None and
location.lower().startswith(self.endpoint_override.lower())):
return location[len(self.endpoint_override):]
else:
return location
except ksc_exc.ConnectionRefused as e:
conn_url = self.get_endpoint(auth=kwargs.get('auth'))
conn_url = "%s/%s" % (conn_url.rstrip('/'), url.lstrip('/'))
message = ("Error finding address for %(url)s: %(e)s" %
dict(url=conn_url, e=e))
raise exc.CommunicationError(message=message)
return self._handle_response(resp)
def _construct_http_client(endpoint=None, username=None, password=None,
include_pass=None, endpoint_type=None,
auth_url=None, **kwargs):
session = kwargs.pop('session', None)
auth = kwargs.pop('auth', None)
def get_http_client(endpoint=None, session=None, **kwargs):
if session:
kwargs['endpoint_override'] = endpoint
return SessionClient(session, auth=auth, **kwargs)
return SessionClient(session, **kwargs)
elif endpoint:
return HTTPClient(endpoint, **kwargs)
else:
return HTTPClient(endpoint=endpoint, username=username,
password=password, include_pass=include_pass,
endpoint_type=endpoint_type, auth_url=auth_url,
**kwargs)
raise AttributeError('Constructing a client must contain either an '
'endpoint or a session')

View File

@ -13,13 +13,20 @@
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import print_function
import hashlib
import logging
import textwrap
from oslo_serialization import jsonutils
from oslo_utils import encodeutils
from oslo_utils import importutils
import prettytable
import re
import six
from six.moves.urllib import parse
import sys
import yaml
from bileanclient import exc
@ -29,11 +36,7 @@ from bileanclient.openstack.common import cliutils
LOG = logging.getLogger(__name__)
supported_formats = {
"json": lambda x: jsonutils.dumps(x, indent=2),
"yaml": yaml.safe_dump
}
SENSITIVE_HEADERS = ('X-Auth-Token', )
# Using common methods from oslo cliutils
arg = cliutils.arg
@ -81,22 +84,19 @@ def print_dict(d, formatters=None):
print(pt.get_string(sortby='Property'))
def event_log_formatter(events):
"""Return the events in log format."""
event_log = []
log_format = ("%(event_time)s "
"[%(rsrc_name)s]: %(rsrc_status)s %(rsrc_status_reason)s")
for event in events:
event_time = getattr(event, 'event_time', '')
log = log_format % {
'event_time': event_time.replace('T', ' '),
'rsrc_name': getattr(event, 'resource_name', ''),
'rsrc_status': getattr(event, 'resource_status', ''),
'rsrc_status_reason': getattr(event, 'resource_status_reason', '')
}
event_log.append(log)
def skip_authentication(f):
"""Function decorator used to indicate a caller may be unauthenticated."""
f.require_authentication = False
return f
return "\n".join(event_log)
def is_authentication_required(f):
"""Checks to see if the function requires authentication.
Use the skip_authentication decorator to indicate a caller may
skip the authentication step.
"""
return getattr(f, 'require_authentication', True)
def import_versioned_module(version, submodule=None):
@ -106,6 +106,55 @@ def import_versioned_module(version, submodule=None):
return importutils.import_module(module)
def exit(msg='', exit_code=1):
if msg:
print_err(msg)
sys.exit(exit_code)
def print_err(msg):
print(encodeutils.safe_decode(msg), file=sys.stderr)
def safe_header(name, value):
if value is not None and name in SENSITIVE_HEADERS:
h = hashlib.sha1(value)
d = h.hexdigest()
return name, "{SHA1}%s" % d
else:
return name, value
def debug_enabled(argv):
if bool(env('BILEANCLIENT_DEBUG')) is True:
return True
if '--debug' in argv or '-d' in argv:
return True
return False
def strip_version(endpoint):
"""Strip version from the last component of endpoint if present."""
# NOTE(flaper87): This shouldn't be necessary if
# we make endpoint the first argument. However, we
# can't do that just yet because we need to keep
# backwards compatibility.
if not isinstance(endpoint, six.string_types):
raise ValueError("Expected endpoint")
version = None
# Get rid of trailing '/' if present
endpoint = endpoint.rstrip('/')
url_parts = parse.urlparse(endpoint)
(scheme, netloc, path, __, __, __) = url_parts
path = path.lstrip('/')
# regex to match 'v1' or 'v2.0' etc
if re.match('v\d+\.?\d*', path):
version = float(path.lstrip('v'))
endpoint = scheme + '://' + netloc
return endpoint, version
def format_parameters(params, parse_semicolon=True):
'''Reformat parameters into dict of format expected by the API.'''
@ -136,6 +185,33 @@ def format_parameters(params, parse_semicolon=True):
return parameters
def get_response_body(resp):
body = resp.content
if 'application/json' in resp.headers.get('content-type', ''):
try:
body = resp.json()
except ValueError:
LOG.error(_LE('Could not decode response body as JSON'))
else:
body = None
return body
def parse_query_url(url):
base_url, query_params = url.split('?')
return base_url, parse.parse_qs(query_params)
def get_spec_content(filename):
with open(filename, 'r') as f:
try:
data = yaml.load(f)
except Exception as ex:
raise exc.CommandError(_('The specified file is not a valid '
'YAML file: %s') % six.text_type(ex))
return data
def format_nested_dict(d, fields, column_names):
if d is None:
return ''
@ -156,25 +232,3 @@ def format_nested_dict(d, fields, column_names):
def nested_dict_formatter(d, column_names):
return lambda o: format_nested_dict(o, d, column_names)
def get_spec_content(filename):
with open(filename, 'r') as f:
try:
data = yaml.load(f)
except Exception as ex:
raise exc.CommandError(_('The specified file is not a valid '
'YAML file: %s') % six.text_type(ex))
return data
def get_response_body(resp):
body = resp.content
if 'application/json' in resp.headers.get('content-type', ''):
try:
body = resp.json()
except ValueError:
LOG.error(_LE('Could not decode response body as JSON'))
else:
body = None
return body

View File

@ -1,5 +1,3 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
@ -12,50 +10,185 @@
# License for the specific language governing permissions and limitations
# under the License.
from bileanclient.openstack.common.apiclient import exceptions
from bileanclient.openstack.common.apiclient.exceptions import * # noqa
import sys
from oslo_serialization import jsonutils
from oslo_utils import reflection
from bileanclient.openstack.common._i18n import _
verbose = 0
# NOTE(akurilin): This alias is left here since v.0.1.3 to support backwards
# compatibility.
InvalidEndpoint = EndpointException
CommunicationError = ConnectionRefused
HTTPBadRequest = BadRequest
HTTPInternalServerError = InternalServerError
HTTPNotFound = NotFound
HTTPServiceUnavailable = ServiceUnavailable
class BaseException(Exception):
"""An error occurred."""
def __init__(self, message=None):
self.message = message
def __str__(self):
return self.message or self.__class__.__doc__
class AmbiguousAuthSystem(ClientException):
"""Could not obtain token and endpoint using provided credentials."""
pass
# Alias for backwards compatibility
AmbigiousAuthSystem = AmbiguousAuthSystem
class CommandError(BaseException):
"""Invalid usage of CLI."""
class InvalidAttribute(ClientException):
class InvalidEndpoint(BaseException):
"""The provided endpoint is invalid."""
class CommunicationError(BaseException):
"""Unable to communicate with server."""
class HTTPException(BaseException):
"""Base exception for all HTTP-derived exceptions."""
code = 'N/A'
def __init__(self, message=None, code=None):
super(HTTPException, self).__init__(message)
try:
self.error = jsonutils.loads(message)
if 'error' not in self.error:
raise KeyError(_('Key "error" not exists'))
except KeyError:
# NOTE(jianingy): If key 'error' happens not exist,
# self.message becomes no sense. In this case, we
# return doc of current exception class instead.
self.error = {'error':
{'message': self.__class__.__doc__}}
except Exception:
self.error = {'error':
{'message': self.message or self.__class__.__doc__}}
if self.code == "N/A" and code is not None:
self.code = code
def __str__(self):
message = self.error['error'].get('message', 'Internal Error')
if verbose:
traceback = self.error['error'].get('traceback', '')
return (_('ERROR: %(message)s\n%(traceback)s') %
{'message': message, 'traceback': traceback})
else:
return _('ERROR: %s') % message
class HTTPMultipleChoices(HTTPException):
code = 300
def __str__(self):
self.details = _("Requested version of Bilean API is not"
"available.")
return (_("%(name)s (HTTP %(code)s) %(details)s") %
{
'name': reflection.get_class_name(self, fully_qualified=False),
'code': self.code,
'details': self.details})
class BadRequest(HTTPException):
"""DEPRECATED."""
code = 400
class HTTPBadRequest(BadRequest):
pass
def from_response(response, message=None, traceback=None, method=None,
url=None):
"""Return an HttpError instance based on response from httplib/requests."""
class Unauthorized(HTTPException):
"""DEPRECATED."""
code = 401
error_body = {}
if message:
error_body['message'] = message
if traceback:
error_body['details'] = traceback
if hasattr(response, 'status') and not hasattr(response, 'status_code'):
# NOTE(akurilin): These modifications around response object give
# ability to get all necessary information in method `from_response`
# from common code, which expecting response object from `requests`
# library instead of object from `httplib/httplib2` library.
response.status_code = response.status
response.headers = {
'Content-Type': response.getheader('content-type', "")}
response.json = lambda: {'error': error_body}
class HTTPUnauthorized(Unauthorized):
pass
return exceptions.from_response(response, message, url)
class Forbidden(HTTPException):
"""DEPRECATED."""
code = 403
class HTTPForbidden(Forbidden):
pass
class NotFound(HTTPException):
"""DEPRECATED."""
code = 404
class HTTPNotFound(NotFound):
pass
class HTTPMethodNotAllowed(HTTPException):
code = 405
class Conflict(HTTPException):
"""DEPRECATED."""
code = 409
class HTTPConflict(Conflict):
pass
class OverLimit(HTTPException):
"""DEPRECATED."""
code = 413
class HTTPOverLimit(OverLimit):
pass
class HTTPUnsupported(HTTPException):
code = 415
class HTTPInternalServerError(HTTPException):
code = 500
class HTTPNotImplemented(HTTPException):
code = 501
class HTTPBadGateway(HTTPException):
code = 502
class ServiceUnavailable(HTTPException):
"""DEPRECATED."""
code = 503
class HTTPServiceUnavailable(ServiceUnavailable):
pass
# NOTE(bcwaldon): Build a mapping of HTTP codes to corresponding exception
# classes
_code_map = {}
for obj_name in dir(sys.modules[__name__]):
if obj_name.startswith('HTTP'):
obj = getattr(sys.modules[__name__], obj_name)
_code_map[obj.code] = obj
def from_response(response):
"""Return an instance of an HTTPException based on requests response."""
cls = _code_map.get(response.status_code, HTTPException)
return cls(response.content, response.status_code)
class NoTokenLookupException(Exception):
"""DEPRECATED."""
pass
class EndpointNotFound(Exception):
"""DEPRECATED."""
pass

File diff suppressed because it is too large Load Diff

View File

View File

View File

@ -0,0 +1,399 @@
# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import functools
import json
from keystoneclient.auth import token_endpoint
from keystoneclient import session
import mock
import requests
from requests_mock.contrib import fixture
import six
from six.moves.urllib import parse
from testscenarios import load_tests_apply_scenarios as load_tests # noqa
import testtools
from testtools import matchers
import types
import bileanclient
from bileanclient.common import http
from bileanclient.tests.unit import utils
def original_only(f):
@functools.wraps(f)
def wrapper(self, *args, **kwargs):
if not hasattr(self.client, 'log_curl_request'):
self.skipTest('Skip logging tests for session client')
return f(self, *args, **kwargs)
class TestClient(testtools.TestCase):
scenarios = [
('httpclient', {'create_client': '_create_http_client'}),
('session', {'create_client': '_create_session_client'})
]
def _create_http_client(self):
return http.HTTPClient(self.endpoint, token=self.token)
def _create_session_client(self):
auth = token_endpoint.Token(self.endpoint, self.token)
sess = session.Session(auth=auth)
return http.SessionClient(sess)
def setUp(self):
super(TestClient, self).setUp()
self.mock = self.useFixture(fixture.Fixture())
self.endpoint = 'http://example.com:8770'
self.ssl_endpoint = 'https://example.com:8770'
self.token = u'abc123'
self.client = getattr(self, self.create_client)()
def test_identity_headers_and_token(self):
identity_headers = {
'X-Auth-Token': 'auth_token',
'X-User-Id': 'user',
'X-Tenant-Id': 'tenant',
'X-Roles': 'roles',
'X-Identity-Status': 'Confirmed',
'X-Service-Catalog': 'service_catalog',
}
# with token
kwargs = {'token': u'fake-token',
'identity_headers': identity_headers}
http_client_object = http.HTTPClient(self.endpoint, **kwargs)
self.assertEqual('auth_token', http_client_object.auth_token)
self.assertTrue(http_client_object.identity_headers.
get('X-Auth-Token') is None)
def test_identity_headers_and_no_token_in_header(self):
identity_headers = {
'X-User-Id': 'user',
'X-Tenant-Id': 'tenant',
'X-Roles': 'roles',
'X-Identity-Status': 'Confirmed',
'X-Service-Catalog': 'service_catalog',
}
# without X-Auth-Token in identity headers
kwargs = {'token': u'fake-token',
'identity_headers': identity_headers}
http_client_object = http.HTTPClient(self.endpoint, **kwargs)
self.assertEqual(u'fake-token', http_client_object.auth_token)
self.assertTrue(http_client_object.identity_headers.
get('X-Auth-Token') is None)
def test_identity_headers_and_no_token_in_session_header(self):
# Tests that if token or X-Auth-Token are not provided in the kwargs
# when creating the http client, the session headers don't contain
# the X-Auth-Token key.
identity_headers = {
'X-User-Id': 'user',
'X-Tenant-Id': 'tenant',
'X-Roles': 'roles',
'X-Identity-Status': 'Confirmed',
'X-Service-Catalog': 'service_catalog',
}
kwargs = {'identity_headers': identity_headers}
http_client_object = http.HTTPClient(self.endpoint, **kwargs)
self.assertIsNone(http_client_object.auth_token)
self.assertNotIn('X-Auth-Token', http_client_object.session.headers)
def test_identity_headers_are_passed(self):
# Tests that if token or X-Auth-Token are not provided in the kwargs
# when creating the http client, the session headers don't contain
# the X-Auth-Token key.
identity_headers = {
'X-User-Id': b'user',
'X-Tenant-Id': b'tenant',
'X-Roles': b'roles',
'X-Identity-Status': b'Confirmed',
'X-Service-Catalog': b'service_catalog',
}
kwargs = {'identity_headers': identity_headers}
http_client = http.HTTPClient(self.endpoint, **kwargs)
path = '/users/user_id'
self.mock.get(self.endpoint + path)
http_client.get(path)
headers = self.mock.last_request.headers
for k, v in six.iteritems(identity_headers):
self.assertEqual(v, headers[k])
def test_language_header_passed(self):
kwargs = {'language_header': 'nb_NO'}
http_client = http.HTTPClient(self.endpoint, **kwargs)
path = '/users/user_id'
self.mock.get(self.endpoint + path)
http_client.get(path)
headers = self.mock.last_request.headers
self.assertEqual(kwargs['language_header'], headers['Accept-Language'])
def test_language_header_not_passed_no_language(self):
kwargs = {}
http_client = http.HTTPClient(self.endpoint, **kwargs)
path = '/1/users/user_id'
self.mock.get(self.endpoint + path)
http_client.get(path)
headers = self.mock.last_request.headers
self.assertTrue('Accept-Language' not in headers)
def test_connection_timeout(self):
"""Should receive an InvalidEndpoint if connection timeout."""
def cb(request, context):
raise requests.exceptions.Timeout
path = '/users'
self.mock.get(self.endpoint + path, text=cb)
comm_err = self.assertRaises(bileanclient.exc.InvalidEndpoint,
self.client.get,
'/users')
self.assertIn(self.endpoint, comm_err.message)
def test_connection_refused(self):
"""
Should receive a CommunicationError if connection refused.
And the error should list the host and port that refused the
connection
"""
def cb(request, context):
raise requests.exceptions.ConnectionError()
path = '/events?limit=20'
self.mock.get(self.endpoint + path, text=cb)
comm_err = self.assertRaises(bileanclient.exc.CommunicationError,
self.client.get,
'/events?limit=20')
self.assertIn(self.endpoint, comm_err.message)
def test_http_encoding(self):
path = '/users'
text = 'Ok'
self.mock.get(self.endpoint + path, text=text,
headers={"Content-Type": "text/plain"})
headers = {"test": u'ni\xf1o'}
resp, body = self.client.get(path, headers=headers)
self.assertEqual(text, resp.text)
def test_headers_encoding(self):
if not hasattr(self.client, 'encode_headers'):
self.skipTest('Cannot do header encoding check on SessionClient')
value = u'ni\xf1o'
headers = {"test": value, "none-val": None}
encoded = self.client.encode_headers(headers)
self.assertEqual(b"ni\xc3\xb1o", encoded[b"test"])
self.assertNotIn("none-val", encoded)
def test_raw_request(self):
"""Verify the path being used for HTTP requests reflects accurately."""
headers = {"Content-Type": "text/plain"}
text = 'Ok'
path = '/users'
self.mock.get(self.endpoint + path, text=text, headers=headers)
resp, body = self.client.get('/users', headers=headers)
self.assertEqual(headers, resp.headers)
self.assertEqual(text, resp.text)
def test_parse_endpoint(self):
endpoint = 'http://example.com:8770'
test_client = http.HTTPClient(endpoint, token=u'adc123')
actual = test_client.parse_endpoint(endpoint)
expected = parse.SplitResult(scheme='http',
netloc='example.com:8770', path='',
query='', fragment='')
self.assertEqual(expected, actual)
def test_get_connections_kwargs_http(self):
endpoint = 'http://example.com:8770'
test_client = http.HTTPClient(endpoint, token=u'adc123')
self.assertEqual(test_client.timeout, 600.0)
def test_http_chunked_request(self):
text = "Ok"
data = six.StringIO(text)
path = '/users'
self.mock.post(self.endpoint + path, text=text)
headers = {"test": u'chunked_request'}
resp, body = self.client.post(path, headers=headers, data=data)
self.assertIsInstance(self.mock.last_request.body, types.GeneratorType)
self.assertEqual(text, resp.text)
def test_http_json(self):
data = {"test": "json_request"}
path = '/users'
text = 'OK'
self.mock.post(self.endpoint + path, text=text)
headers = {"test": u'chunked_request'}
resp, body = self.client.post(path, headers=headers, data=data)
self.assertEqual(text, resp.text)
self.assertIsInstance(self.mock.last_request.body, six.string_types)
self.assertEqual(data, json.loads(self.mock.last_request.body))
def test_http_chunked_response(self):
data = "TEST"
path = '/users/'
self.mock.get(self.endpoint + path, body=six.StringIO(data),
headers={"Content-Type": "application/octet-stream"})
resp, body = self.client.get(path)
self.assertIsInstance(body, types.GeneratorType)
self.assertEqual([data], list(body))
@original_only
def test_log_http_response_with_non_ascii_char(self):
try:
response = 'Ok'
headers = {"Content-Type": "text/plain",
"test": "value1\xa5\xa6"}
fake = utils.FakeResponse(headers, six.StringIO(response))
self.client.log_http_response(fake)
except UnicodeDecodeError as e:
self.fail("Unexpected UnicodeDecodeError exception '%s'" % e)
@original_only
def test_log_curl_request_with_non_ascii_char(self):
try:
headers = {'header1': 'value1\xa5\xa6'}
body = 'examplebody\xa5\xa6'
self.client.log_curl_request('GET', '/api/\xa5', headers, body,
None)
except UnicodeDecodeError as e:
self.fail("Unexpected UnicodeDecodeError exception '%s'" % e)
@original_only
@mock.patch('bileanclient.common.http.LOG.debug')
def test_log_curl_request_with_body_and_header(self, mock_log):
hd_name = 'header1'
hd_val = 'value1'
headers = {hd_name: hd_val}
body = 'examplebody'
self.client.log_curl_request('GET', '/api/', headers, body, None)
self.assertTrue(mock_log.called, 'LOG.debug never called')
self.assertTrue(mock_log.call_args[0],
'LOG.debug called with no arguments')
hd_regex = ".*\s-H\s+'\s*%s\s*:\s*%s\s*'.*" % (hd_name, hd_val)
self.assertThat(mock_log.call_args[0][0],
matchers.MatchesRegex(hd_regex),
'header not found in curl command')
body_regex = ".*\s-d\s+'%s'\s.*" % body
self.assertThat(mock_log.call_args[0][0],
matchers.MatchesRegex(body_regex),
'body not found in curl command')
def _test_log_curl_request_with_certs(self, mock_log, key, cert, cacert):
headers = {'header1': 'value1'}
http_client_object = http.HTTPClient(self.ssl_endpoint, key_file=key,
cert_file=cert, cacert=cacert,
token='fake-token')
http_client_object.log_curl_request('GET', '/api/', headers, None,
None)
self.assertTrue(mock_log.called, 'LOG.debug never called')
self.assertTrue(mock_log.call_args[0],
'LOG.debug called with no arguments')
needles = {'key': key, 'cert': cert, 'cacert': cacert}
for option, value in six.iteritems(needles):
if value:
regex = ".*\s--%s\s+('%s'|%s).*" % (option, value, value)
self.assertThat(mock_log.call_args[0][0],
matchers.MatchesRegex(regex),
'no --%s option in curl command' % option)
else:
regex = ".*\s--%s\s+.*" % option
self.assertThat(mock_log.call_args[0][0],
matchers.Not(matchers.MatchesRegex(regex)),
'unexpected --%s option in curl command' %
option)
@mock.patch('bileanclient.common.http.LOG.debug')
def test_log_curl_request_with_all_certs(self, mock_log):
self._test_log_curl_request_with_certs(mock_log, 'key1', 'cert1',
'cacert2')
@mock.patch('bileanclient.common.http.LOG.debug')
def test_log_curl_request_with_some_certs(self, mock_log):
self._test_log_curl_request_with_certs(mock_log, 'key1', 'cert1', None)
@mock.patch('bileanclient.common.http.LOG.debug')
def test_log_curl_request_with_insecure_param(self, mock_log):
headers = {'header1': 'value1'}
http_client_object = http.HTTPClient(self.ssl_endpoint, insecure=True,
token='fake-token')
http_client_object.log_curl_request('GET', '/api/', headers, None,
None)
self.assertTrue(mock_log.called, 'LOG.debug never called')
self.assertTrue(mock_log.call_args[0],
'LOG.debug called with no arguments')
self.assertThat(mock_log.call_args[0][0],
matchers.MatchesRegex('.*\s-k\s.*'),
'no -k option in curl command')
@mock.patch('bileanclient.common.http.LOG.debug')
def test_log_curl_request_with_token_header(self, mock_log):
fake_token = 'fake-token'
headers = {'X-Auth-Token': fake_token}
http_client_object = http.HTTPClient(self.endpoint,
identity_headers=headers)
http_client_object.log_curl_request('GET', '/api/', headers, None,
None)
self.assertTrue(mock_log.called, 'LOG.debug never called')
self.assertTrue(mock_log.call_args[0],
'LOG.debug called with no arguments')
token_regex = '.*%s.*' % fake_token
self.assertThat(mock_log.call_args[0][0],
matchers.Not(matchers.MatchesRegex(token_regex)),
'token found in LOG.debug parameter')
def test_expired_token_has_changed(self):
# instantiate client with some token
fake_token = b'fake-token'
http_client = http.HTTPClient(self.endpoint,
token=fake_token)
path = '/users/user_id'
self.mock.get(self.endpoint + path)
http_client.get(path)
headers = self.mock.last_request.headers
self.assertEqual(fake_token, headers['X-Auth-Token'])
# refresh the token
refreshed_token = b'refreshed-token'
http_client.auth_token = refreshed_token
http_client.get(path)
headers = self.mock.last_request.headers
self.assertEqual(refreshed_token, headers['X-Auth-Token'])
# regression check for bug 1448080
unicode_token = u'ni\xf1o'
http_client.auth_token = unicode_token
http_client.get(path)
headers = self.mock.last_request.headers
self.assertEqual(b'ni\xc3\xb1o', headers['X-Auth-Token'])

View File

@ -0,0 +1,636 @@
# Copyright 2013 OpenStack Foundation
# Copyright (C) 2013 Yahoo! Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import argparse
try:
from collections import OrderedDict
except ImportError:
from ordereddict import OrderedDict
import hashlib
import json
import logging
import os
import sys
import traceback
import uuid
import fixtures
from keystoneclient import exceptions as ks_exc
from keystoneclient import fixture as ks_fixture
import mock
import requests
from requests_mock.contrib import fixture as rm_fixture
import six
from bileanclient.common import utils
from bileanclient import exc
from bileanclient import shell as openstack_shell
from bileanclient.tests.unit import utils as testutils
DEFAULT_IMAGE_URL = 'http://127.0.0.1:8770/'
DEFAULT_USERNAME = 'username'
DEFAULT_PASSWORD = 'password'
DEFAULT_TENANT_ID = 'tenant_id'
DEFAULT_TENANT_NAME = 'tenant_name'
DEFAULT_PROJECT_ID = '0123456789'
DEFAULT_USER_DOMAIN_NAME = 'user_domain_name'
DEFAULT_UNVERSIONED_AUTH_URL = 'http://127.0.0.1:5000/'
DEFAULT_V2_AUTH_URL = '%sv2.0' % DEFAULT_UNVERSIONED_AUTH_URL
DEFAULT_V3_AUTH_URL = '%sv3' % DEFAULT_UNVERSIONED_AUTH_URL
DEFAULT_AUTH_TOKEN = ' 3bcc3d3a03f44e3d8377f9247b0ad155'
TEST_SERVICE_URL = 'http://127.0.0.1:8770/'
FAKE_V2_ENV = {'OS_USERNAME': DEFAULT_USERNAME,
'OS_PASSWORD': DEFAULT_PASSWORD,
'OS_TENANT_NAME': DEFAULT_TENANT_NAME,
'OS_AUTH_URL': DEFAULT_V2_AUTH_URL,
'OS_IMAGE_URL': DEFAULT_IMAGE_URL}
FAKE_V3_ENV = {'OS_USERNAME': DEFAULT_USERNAME,
'OS_PASSWORD': DEFAULT_PASSWORD,
'OS_PROJECT_ID': DEFAULT_PROJECT_ID,
'OS_USER_DOMAIN_NAME': DEFAULT_USER_DOMAIN_NAME,
'OS_AUTH_URL': DEFAULT_V3_AUTH_URL,
'OS_IMAGE_URL': DEFAULT_IMAGE_URL}
TOKEN_ID = uuid.uuid4().hex
V2_TOKEN = ks_fixture.V2Token(token_id=TOKEN_ID)
V2_TOKEN.set_scope()
_s = V2_TOKEN.add_service('billing', name='bilean')
_s.add_endpoint(DEFAULT_IMAGE_URL)
V3_TOKEN = ks_fixture.V3Token()
V3_TOKEN.set_project_scope()
_s = V3_TOKEN.add_service('billing', name='bilean')
_s.add_standard_endpoints(public=DEFAULT_IMAGE_URL)
class ShellTest(testutils.TestCase):
# auth environment to use
auth_env = FAKE_V2_ENV.copy()
# expected auth plugin to invoke
token_url = DEFAULT_V2_AUTH_URL + '/tokens'
# Patch os.environ to avoid required auth info
def make_env(self, exclude=None):
env = dict((k, v) for k, v in self.auth_env.items() if k != exclude)
self.useFixture(fixtures.MonkeyPatch('os.environ', env))
def setUp(self):
super(ShellTest, self).setUp()
global _old_env
_old_env, os.environ = os.environ, self.auth_env
self.requests = self.useFixture(rm_fixture.Fixture())
json_list = ks_fixture.DiscoveryList(DEFAULT_UNVERSIONED_AUTH_URL)
self.requests.get(DEFAULT_IMAGE_URL, json=json_list, status_code=300)
json_v2 = {'version': ks_fixture.V2Discovery(DEFAULT_V2_AUTH_URL)}
self.requests.get(DEFAULT_V2_AUTH_URL, json=json_v2)
json_v3 = {'version': ks_fixture.V3Discovery(DEFAULT_V3_AUTH_URL)}
self.requests.get(DEFAULT_V3_AUTH_URL, json=json_v3)
self.v2_auth = self.requests.post(DEFAULT_V2_AUTH_URL + '/tokens',
json=V2_TOKEN)
headers = {'X-Subject-Token': TOKEN_ID}
self.v3_auth = self.requests.post(DEFAULT_V3_AUTH_URL + '/auth/tokens',
headers=headers,
json=V3_TOKEN)
global shell, _shell, assert_called, assert_called_anytime
_shell = openstack_shell.BileanShell()
shell = lambda cmd: _shell.main(cmd.split())
def tearDown(self):
super(ShellTest, self).tearDown()
global _old_env
os.environ = _old_env
def shell(self, argstr, exitcodes=(0,)):
orig = sys.stdout
orig_stderr = sys.stderr
try:
sys.stdout = six.StringIO()
sys.stderr = six.StringIO()
_shell = openstack_shell.BileanShell()
_shell.main(argstr.split())
except SystemExit:
exc_type, exc_value, exc_traceback = sys.exc_info()
self.assertIn(exc_value.code, exitcodes)
finally:
stdout = sys.stdout.getvalue()
sys.stdout.close()
sys.stdout = orig
stderr = sys.stderr.getvalue()
sys.stderr.close()
sys.stderr = orig_stderr
return (stdout, stderr)
def test_help_unknown_command(self):
shell = openstack_shell.BileanShell()
argstr = 'help foofoo'
self.assertRaises(exc.CommandError, shell.main, argstr.split())
@mock.patch('sys.stdout', six.StringIO())
@mock.patch('sys.stderr', six.StringIO())
@mock.patch('sys.argv', ['bilean', 'help', 'foofoo'])
def test_no_stacktrace_when_debug_disabled(self):
with mock.patch.object(traceback, 'print_exc') as mock_print_exc:
try:
openstack_shell.main()
except SystemExit:
pass
self.assertFalse(mock_print_exc.called)
@mock.patch('sys.stdout', six.StringIO())
@mock.patch('sys.stderr', six.StringIO())
@mock.patch('sys.argv', ['bilean', 'help', 'foofoo'])
def test_stacktrace_when_debug_enabled_by_env(self):
old_environment = os.environ.copy()
os.environ = {'BILEANCLIENT_DEBUG': '1'}
try:
with mock.patch.object(traceback, 'print_exc') as mock_print_exc:
try:
openstack_shell.main()
except SystemExit:
pass
self.assertTrue(mock_print_exc.called)
finally:
os.environ = old_environment
@mock.patch('sys.stdout', six.StringIO())
@mock.patch('sys.stderr', six.StringIO())
@mock.patch('sys.argv', ['bilean', '--debug', 'help', 'foofoo'])
def test_stacktrace_when_debug_enabled(self):
with mock.patch.object(traceback, 'print_exc') as mock_print_exc:
try:
openstack_shell.main()
except SystemExit:
pass
self.assertTrue(mock_print_exc.called)
def test_help(self):
shell = openstack_shell.BileanShell()
argstr = 'help'
with mock.patch.object(shell, '_get_keystone_session') as et_mock:
actual = shell.main(argstr.split())
self.assertEqual(0, actual)
self.assertFalse(et_mock.called)
def test_blank_call(self):
shell = openstack_shell.BileanShell()
with mock.patch.object(shell, '_get_keystone_session') as et_mock:
actual = shell.main('')
self.assertEqual(0, actual)
self.assertFalse(et_mock.called)
def test_help_on_subcommand_error(self):
self.assertRaises(exc.CommandError, shell, 'help bad')
def test_get_base_parser(self):
test_shell = openstack_shell.BileanShell()
actual_parser = test_shell.get_base_parser()
description = 'Command-line interface to the OpenStack Bilean API.'
expected = argparse.ArgumentParser(
prog='bilean', usage=None,
description=description,
conflict_handler='error',
add_help=False,
formatter_class=openstack_shell.HelpFormatter,)
self.assertEqual(str(expected), str(actual_parser))
# @mock.patch.object(openstack_shell.BileanShell,
# '_get_keystone_session')
# @mock.patch.object(openstack_shell.BileanShell,
# '_get_keystone_auth')
# def test_cert_and_key_args_interchangeable(self,
# mock_keystone_session,
# mock_keystone_auth):
# # make sure --os-cert and --os-key are passed correctly
# args = ('--bilean-api-version 1 '
# '--os-cert mycert '
# '--os-key mykey user-list')
# shell(args)
# assert mock_keystone_session.called
# args, kwargs = mock_keystone_session.call_args
#
# self.assertEqual('mycert', kwargs['cert'])
# self.assertEqual('mykey', kwargs['key'])
#
@mock.patch('bileanclient.v1.client.Client')
def test_no_auth_with_token_and_bilean_url(self, mock_client):
# test no authentication is required if both token and endpoint url
# are specified
args = ('--os-bilean-api-version 1 --os-auth-token mytoken'
' --os-bilean-url http://host:1234/v1 user-list')
bilean_shell = openstack_shell.BileanShell()
bilean_shell.main(args.split())
assert mock_client.called
(args, kwargs) = mock_client.call_args
self.assertEqual('mytoken', kwargs['token'])
self.assertEqual('http://host:1234', args[0])
# @mock.patch('bileanclient.v1.client.Client')
# def test_no_auth_with_token_and_image_url_with_v1(self, v1_client):
# # test no authentication is required if both token and endpoint url
# # are specified
# args = ('--os-image-api-version 1 --os-auth-token mytoken'
# ' --os-image-url https://image:1234/v1 image-list')
# bilean_shell = openstack_shell.OpenStackImagesShell()
# bilean_shell.main(args.split())
# assert v1_client.called
# (args, kwargs) = v1_client.call_args
# self.assertEqual('mytoken', kwargs['token'])
# self.assertEqual('https://image:1234', args[0])
#
# @mock.patch('bileanclient.v2.client.Client')
# def test_no_auth_with_token_and_image_url_with_v2(self, v2_client):
# # test no authentication is required if both token and endpoint url
# # are specified
# args = ('--os-image-api-version 2 --os-auth-token mytoken '
# '--os-image-url https://image:1234 image-list')
# bilean_shell = openstack_shell.OpenStackImagesShell()
# bilean_shell.main(args.split())
# self.assertTrue(v2_client.called)
# (args, kwargs) = v2_client.call_args
# self.assertEqual('mytoken', kwargs['token'])
# self.assertEqual('https://image:1234', args[0])
#
# def _assert_auth_plugin_args(self):
# # make sure our auth plugin is invoked with the correct args
# self.assertFalse(self.v3_auth.called)
#
# body = json.loads(self.v2_auth.last_request.body)
#
# self.assertEqual(self.auth_env['OS_TENANT_NAME'],
# body['auth']['tenantName'])
# self.assertEqual(self.auth_env['OS_USERNAME'],
# body['auth']['passwordCredentials']['username'])
# self.assertEqual(self.auth_env['OS_PASSWORD'],
# body['auth']['passwordCredentials']['password'])
#
# @mock.patch.object(openstack_shell.OpenStackImagesShell, '_cache_schemas',
# return_value=False)
# @mock.patch('bileanclient.v2.client.Client')
# def test_auth_plugin_invocation_without_version(self,
# v2_client,
# cache_schemas):
#
# cli2 = mock.MagicMock()
# v2_client.return_value = cli2
# cli2.http_client.get.return_value = (None, {'versions':
# [{'id': 'v2'}]})
#
# args = 'image-list'
# bilean_shell = openstack_shell.OpenStackImagesShell()
# bilean_shell.main(args.split())
# # NOTE(flaper87): this currently calls auth twice since it'll
# # authenticate to get the version list *and* to execute the command.
# # This is not the ideal behavior and it should be fixed in a follow
# # up patch.
#
# @mock.patch('bileanclient.v1.client.Client')
# def test_auth_plugin_invocation_with_v1(self, v1_client):
# args = '--os-image-api-version 1 image-list'
# bilean_shell = openstack_shell.OpenStackImagesShell()
# bilean_shell.main(args.split())
# self.assertEqual(0, self.v2_auth.call_count)
#
# @mock.patch('bileanclient.v2.client.Client')
# def test_auth_plugin_invocation_with_v2(self,
# v2_client):
# args = '--os-image-api-version 2 image-list'
# bilean_shell = openstack_shell.OpenStackImagesShell()
# bilean_shell.main(args.split())
# self.assertEqual(0, self.v2_auth.call_count)
#
# @mock.patch('bileanclient.v1.client.Client')
# def test_auth_plugin_invocation_with_unversioned_auth_url_with_v1(
# self, v1_client):
# args = ('--os-image-api-version 1 --os-auth-url %s image-list' %
# DEFAULT_UNVERSIONED_AUTH_URL)
# bilean_shell = openstack_shell.OpenStackImagesShell()
# bilean_shell.main(args.split())
#
# @mock.patch('bileanclient.v2.client.Client')
# @mock.patch.object(openstack_shell.OpenStackImagesShell, '_cache_schemas',
# return_value=False)
# def test_auth_plugin_invocation_with_unversioned_auth_url_with_v2(
# self, v2_client, cache_schemas):
# args = ('--os-auth-url %s --os-image-api-version 2 '
# 'image-list') % DEFAULT_UNVERSIONED_AUTH_URL
# bilean_shell = openstack_shell.OpenStackImagesShell()
# bilean_shell.main(args.split())
#
# @mock.patch('bileanclient.Client')
# def test_endpoint_token_no_auth_req(self, mock_client):
#
# def verify_input(version=None, endpoint=None, *args, **kwargs):
# self.assertIn('token', kwargs)
# self.assertEqual(TOKEN_ID, kwargs['token'])
# self.assertEqual(DEFAULT_IMAGE_URL, endpoint)
# return mock.MagicMock()
#
# mock_client.side_effect = verify_input
# bilean_shell = openstack_shell.OpenStackImagesShell()
# args = ['--os-image-api-version', '2',
# '--os-auth-token', TOKEN_ID,
# '--os-image-url', DEFAULT_IMAGE_URL,
# 'image-list']
#
# bilean_shell.main(args)
# self.assertEqual(1, mock_client.call_count)
#
# @mock.patch('bileanclient.v2.client.Client')
# def test_password_prompted_with_v2(self, v2_client):
# self.requests.post(self.token_url, exc=requests.ConnectionError)
#
# cli2 = mock.MagicMock()
# v2_client.return_value = cli2
# cli2.http_client.get.return_value = (None, {'versions': []})
# bilean_shell = openstack_shell.OpenStackImagesShell()
# os.environ['OS_PASSWORD'] = 'password'
# self.assertRaises(exc.CommunicationError,
# bilean_shell.main, ['image-list'])
#
# @mock.patch('sys.stdin', side_effect=mock.MagicMock)
# @mock.patch('getpass.getpass', side_effect=EOFError)
# @mock.patch('bileanclient.v2.client.Client')
# def test_password_prompted_ctrlD_with_v2(self, v2_client,
# mock_getpass, mock_stdin):
# cli2 = mock.MagicMock()
# v2_client.return_value = cli2
# cli2.http_client.get.return_value = (None, {'versions': []})
#
# bilean_shell = openstack_shell.OpenStackImagesShell()
# self.make_env(exclude='OS_PASSWORD')
# # We should get Command Error because we mock Ctl-D.
# self.assertRaises(exc.CommandError, bilean_shell.main, ['image-list'])
# # Make sure we are actually prompted.
# mock_getpass.assert_called_with('OS Password: ')
#
# @mock.patch(
# 'bileanclient.shell.OpenStackImagesShell._get_keystone_session')
# @mock.patch.object(openstack_shell.OpenStackImagesShell, '_cache_schemas',
# return_value=False)
# def test_no_auth_with_proj_name(self, cache_schemas, session):
# with mock.patch('bileanclient.v2.client.Client'):
# args = ('--os-project-name myname '
# '--os-project-domain-name mydomain '
# '--os-project-domain-id myid '
# '--os-image-api-version 2 image-list')
# bilean_shell = openstack_shell.OpenStackImagesShell()
# bilean_shell.main(args.split())
# ((args), kwargs) = session.call_args
# self.assertEqual('myname', kwargs['project_name'])
# self.assertEqual('mydomain', kwargs['project_domain_name'])
# self.assertEqual('myid', kwargs['project_domain_id'])
#
# @mock.patch.object(openstack_shell.OpenStackImagesShell, 'main')
# def test_shell_keyboard_interrupt(self, mock_bilean_shell):
# # Ensure that exit code is 130 for KeyboardInterrupt
# try:
# mock_bilean_shell.side_effect = KeyboardInterrupt()
# openstack_shell.main()
# except SystemExit as ex:
# self.assertEqual(130, ex.code)
#
# @mock.patch('bileanclient.common.utils.exit', side_effect=utils.exit)
# def test_shell_illegal_version(self, mock_exit):
# # Only int versions are allowed on cli
# shell = openstack_shell.OpenStackImagesShell()
# argstr = '--os-image-api-version 1.1 image-list'
# try:
# shell.main(argstr.split())
# except SystemExit as ex:
# self.assertEqual(1, ex.code)
# msg = ("Invalid API version parameter. "
# "Supported values are %s" % openstack_shell.SUPPORTED_VERSIONS)
# mock_exit.assert_called_with(msg=msg)
#
# @mock.patch('bileanclient.common.utils.exit', side_effect=utils.exit)
# def test_shell_unsupported_version(self, mock_exit):
# # Test an integer version which is not supported (-1)
# shell = openstack_shell.OpenStackImagesShell()
# argstr = '--os-image-api-version -1 image-list'
# try:
# shell.main(argstr.split())
# except SystemExit as ex:
# self.assertEqual(1, ex.code)
# msg = ("Invalid API version parameter. "
# "Supported values are %s" % openstack_shell.SUPPORTED_VERSIONS)
# mock_exit.assert_called_with(msg=msg)
#
# @mock.patch.object(openstack_shell.OpenStackImagesShell,
# 'get_subcommand_parser')
# def test_shell_import_error_with_mesage(self, mock_parser):
# msg = 'Unable to import module xxx'
# mock_parser.side_effect = ImportError('%s' % msg)
# shell = openstack_shell.OpenStackImagesShell()
# argstr = '--os-image-api-version 2 image-list'
# try:
# shell.main(argstr.split())
# self.fail('No import error returned')
# except ImportError as e:
# self.assertEqual(msg, str(e))
#
# @mock.patch.object(openstack_shell.OpenStackImagesShell,
# 'get_subcommand_parser')
# def test_shell_import_error_default_message(self, mock_parser):
# mock_parser.side_effect = ImportError
# shell = openstack_shell.OpenStackImagesShell()
# argstr = '--os-image-api-version 2 image-list'
# try:
# shell.main(argstr.split())
# self.fail('No import error returned')
# except ImportError as e:
# msg = 'Unable to import module. Re-run with --debug for more info.'
# self.assertEqual(msg, str(e))
#
# @mock.patch('bileanclient.v2.client.Client')
# @mock.patch('bileanclient.v1.images.ImageManager.list')
# def test_shell_v1_fallback_from_v2(self, v1_imgs, v2_client):
# self.make_env()
# cli2 = mock.MagicMock()
# v2_client.return_value = cli2
# cli2.http_client.get.return_value = (None, {'versions': []})
# args = 'image-list'
# bilean_shell = openstack_shell.OpenStackImagesShell()
# bilean_shell.main(args.split())
# self.assertFalse(cli2.schemas.get.called)
# self.assertTrue(v1_imgs.called)
#
# @mock.patch.object(openstack_shell.OpenStackImagesShell,
# '_cache_schemas')
# @mock.patch('bileanclient.v2.client.Client')
# def test_shell_no_fallback_from_v2(self, v2_client, cache_schemas):
# self.make_env()
# cli2 = mock.MagicMock()
# v2_client.return_value = cli2
# cli2.http_client.get.return_value = (None,
# {'versions': [{'id': 'v2'}]})
# cache_schemas.return_value = False
# args = 'image-list'
# bilean_shell = openstack_shell.OpenStackImagesShell()
# bilean_shell.main(args.split())
# self.assertTrue(cli2.images.list.called)
#
# @mock.patch('bileanclient.v1.client.Client')
# def test_auth_plugin_invocation_without_username_with_v1(self, v1_client):
# self.make_env(exclude='OS_USERNAME')
# args = '--os-image-api-version 2 image-list'
# bilean_shell = openstack_shell.OpenStackImagesShell()
# self.assertRaises(exc.CommandError, bilean_shell.main, args.split())
#
# @mock.patch('bileanclient.v2.client.Client')
# def test_auth_plugin_invocation_without_username_with_v2(self, v2_client):
# self.make_env(exclude='OS_USERNAME')
# args = '--os-image-api-version 2 image-list'
# bilean_shell = openstack_shell.OpenStackImagesShell()
# self.assertRaises(exc.CommandError, bilean_shell.main, args.split())
#
# @mock.patch('bileanclient.v1.client.Client')
# def test_auth_plugin_invocation_without_auth_url_with_v1(self, v1_client):
# self.make_env(exclude='OS_AUTH_URL')
# args = '--os-image-api-version 1 image-list'
# bilean_shell = openstack_shell.OpenStackImagesShell()
# self.assertRaises(exc.CommandError, bilean_shell.main, args.split())
#
# @mock.patch('bileanclient.v2.client.Client')
# def test_auth_plugin_invocation_without_auth_url_with_v2(self, v2_client):
# self.make_env(exclude='OS_AUTH_URL')
# args = '--os-image-api-version 2 image-list'
# bilean_shell = openstack_shell.OpenStackImagesShell()
# self.assertRaises(exc.CommandError, bilean_shell.main, args.split())
#
# @mock.patch('bileanclient.v1.client.Client')
# def test_auth_plugin_invocation_without_tenant_with_v1(self, v1_client):
# if 'OS_TENANT_NAME' in os.environ:
# self.make_env(exclude='OS_TENANT_NAME')
# if 'OS_PROJECT_ID' in os.environ:
# self.make_env(exclude='OS_PROJECT_ID')
# args = '--os-image-api-version 1 image-list'
# bilean_shell = openstack_shell.OpenStackImagesShell()
# self.assertRaises(exc.CommandError, bilean_shell.main, args.split())
#
# @mock.patch('bileanclient.v2.client.Client')
# @mock.patch.object(openstack_shell.OpenStackImagesShell, '_cache_schemas',
# return_value=False)
# def test_auth_plugin_invocation_without_tenant_with_v2(self, v2_client,
# cache_schemas):
# if 'OS_TENANT_NAME' in os.environ:
# self.make_env(exclude='OS_TENANT_NAME')
# if 'OS_PROJECT_ID' in os.environ:
# self.make_env(exclude='OS_PROJECT_ID')
# args = '--os-image-api-version 2 image-list'
# bilean_shell = openstack_shell.OpenStackImagesShell()
# self.assertRaises(exc.CommandError, bilean_shell.main, args.split())
#
# @mock.patch('sys.argv', ['bilean'])
# @mock.patch('sys.stdout', six.StringIO())
# @mock.patch('sys.stderr', six.StringIO())
# def test_main_noargs(self):
# # Ensure that main works with no command-line arguments
# try:
# openstack_shell.main()
# except SystemExit:
# self.fail('Unexpected SystemExit')
#
# # We expect the normal v2 usage as a result
# expected = ['Command-line interface to the OpenStack Images API',
# 'image-list',
# 'image-deactivate',
# 'location-add']
# for output in expected:
# self.assertIn(output,
# sys.stdout.getvalue())
#
# @mock.patch('bileanclient.v2.client.Client')
# @mock.patch('bileanclient.v1.shell.do_image_list')
# @mock.patch('bileanclient.shell.logging.basicConfig')
# def test_setup_debug(self, conf, func, v2_client):
# cli2 = mock.MagicMock()
# v2_client.return_value = cli2
# cli2.http_client.get.return_value = (None, {'versions': []})
# args = '--debug image-list'
# bilean_shell = openstack_shell.OpenStackImagesShell()
# bilean_shell.main(args.split())
# bilean_logger = logging.getLogger('bileanclient')
# self.assertEqual(bilean_logger.getEffectiveLevel(), logging.DEBUG)
# conf.assert_called_with(level=logging.DEBUG)
#
#
#class ShellTestWithKeystoneV3Auth(ShellTest):
# # auth environment to use
# auth_env = FAKE_V3_ENV.copy()
# token_url = DEFAULT_V3_AUTH_URL + '/auth/tokens'
#
# def _assert_auth_plugin_args(self):
# self.assertFalse(self.v2_auth.called)
#
# body = json.loads(self.v3_auth.last_request.body)
# user = body['auth']['identity']['password']['user']
#
# self.assertEqual(self.auth_env['OS_USERNAME'], user['name'])
# self.assertEqual(self.auth_env['OS_PASSWORD'], user['password'])
# self.assertEqual(self.auth_env['OS_USER_DOMAIN_NAME'],
# user['domain']['name'])
# self.assertEqual(self.auth_env['OS_PROJECT_ID'],
# body['auth']['scope']['project']['id'])
#
# @mock.patch('bileanclient.v1.client.Client')
# def test_auth_plugin_invocation_with_v1(self, v1_client):
# args = '--os-image-api-version 1 image-list'
# bilean_shell = openstack_shell.OpenStackImagesShell()
# bilean_shell.main(args.split())
# self.assertEqual(0, self.v3_auth.call_count)
#
# @mock.patch('bileanclient.v2.client.Client')
# def test_auth_plugin_invocation_with_v2(self, v2_client):
# args = '--os-image-api-version 2 image-list'
# bilean_shell = openstack_shell.OpenStackImagesShell()
# bilean_shell.main(args.split())
# self.assertEqual(0, self.v3_auth.call_count)
#
# @mock.patch('keystoneclient.discover.Discover',
# side_effect=ks_exc.ClientException())
# def test_api_discovery_failed_with_unversioned_auth_url(self,
# discover):
# args = ('--os-image-api-version 2 --os-auth-url %s image-list'
# % DEFAULT_UNVERSIONED_AUTH_URL)
# bilean_shell = openstack_shell.OpenStackImagesShell()
# self.assertRaises(exc.CommandError, bilean_shell.main, args.split())
#
# def test_bash_completion(self):
# stdout, stderr = self.shell('--os-image-api-version 2 bash_completion')
# # just check we have some output
# required = [
# '--status',
# 'image-create',
# 'help',
# '--size']
# for r in required:
# self.assertIn(r, stdout.split())
# avoided = [
# 'bash_completion',
# 'bash-completion']
# for r in avoided:
# self.assertNotIn(r, stdout.split())

View File

@ -0,0 +1,241 @@
# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import mock
from six import moves
import sys
import testtools
from bileanclient.common import utils
from bileanclient import exc
class ShellTest(testtools.TestCase):
def test_format_parameter_none(self):
self.assertEqual({}, utils.format_parameters(None))
def test_format_parameters(self):
p = utils.format_parameters(['name=bilean_user;status=ACTIVE'])
self.assertEqual({'name': 'bilean_user',
'status': 'ACTIVE'}, p)
def test_format_parameters_split(self):
p = utils.format_parameters([
'name=bilean_user',
'status=ACTIVE'])
self.assertEqual({'name': 'bilean_user',
'status': 'ACTIVE'}, p)
def test_format_parameters_multiple_semicolon_values(self):
p = utils.format_parameters([
'status=ACTIVE',
'name=bilean;user'])
self.assertEqual({'name': 'bilean;user',
'status': 'ACTIVE'}, p)
def test_format_parameters_parse_semicolon_false(self):
p = utils.format_parameters(
['name=bilean;a=b'],
parse_semicolon=False)
self.assertEqual({'name': 'bilean;a=b'}, p)
def test_format_parameters_multiple_values_per_pamaters(self):
p = utils.format_parameters([
'status=ACTIVE',
'status=FREE'])
self.assertIn('status', p)
self.assertIn('ACTIVE', p['status'])
self.assertIn('FREE', p['status'])
def test_format_parameter_bad_parameter(self):
params = ['name=bilean_user;statusACTIVE']
ex = self.assertRaises(exc.CommandError,
utils.format_parameters, params)
self.assertEqual('Malformed parameter(statusACTIVE). '
'Use the key=value format.', str(ex))
def test_format_multiple_bad_parameter(self):
params = ['name=bilean_user', 'statusACTIVE']
ex = self.assertRaises(exc.CommandError,
utils.format_parameters, params)
self.assertEqual('Malformed parameter(statusACTIVE). '
'Use the key=value format.', str(ex))
def test_link_formatter(self):
self.assertEqual('', utils.link_formatter(None))
self.assertEqual('', utils.link_formatter([]))
self.assertEqual(
'http://foo.example.com\nhttp://bar.example.com',
utils.link_formatter([
{'href': 'http://foo.example.com'},
{'href': 'http://bar.example.com'}]))
self.assertEqual(
'http://foo.example.com (a)\nhttp://bar.example.com (b)',
utils.link_formatter([
{'href': 'http://foo.example.com', 'rel': 'a'},
{'href': 'http://bar.example.com', 'rel': 'b'}]))
self.assertEqual(
'\n',
utils.link_formatter([
{'hrf': 'http://foo.example.com'},
{}]))
def test_json_formatter(self):
self.assertEqual('null', utils.json_formatter(None))
self.assertEqual('{}', utils.json_formatter({}))
self.assertEqual('{\n "foo": "bar"\n}',
utils.json_formatter({"foo": "bar"}))
self.assertEqual(u'{\n "Uni": "test\u2665"\n}',
utils.json_formatter({"Uni": u"test\u2665"}))
def test_yaml_formatter(self):
self.assertEqual('null\n...\n', utils.yaml_formatter(None))
self.assertEqual('{}\n', utils.yaml_formatter({}))
self.assertEqual('foo: bar\n',
utils.yaml_formatter({"foo": "bar"}))
def test_text_wrap_formatter(self):
self.assertEqual('', utils.text_wrap_formatter(None))
self.assertEqual('', utils.text_wrap_formatter(''))
self.assertEqual('one two three',
utils.text_wrap_formatter('one two three'))
self.assertEqual(
'one two three four five six seven eight nine ten eleven\ntwelve',
utils.text_wrap_formatter(
('one two three four five six seven '
'eight nine ten eleven twelve')))
def test_newline_list_formatter(self):
self.assertEqual('', utils.newline_list_formatter(None))
self.assertEqual('', utils.newline_list_formatter([]))
self.assertEqual('one\ntwo',
utils.newline_list_formatter(['one', 'two']))
class CaptureStdout(object):
"""Context manager for capturing stdout from statements in its block."""
def __enter__(self):
self.real_stdout = sys.stdout
self.stringio = moves.StringIO()
sys.stdout = self.stringio
return self
def __exit__(self, *args):
sys.stdout = self.real_stdout
self.stringio.seek(0)
self.read = self.stringio.read
class PrintListTestCase(testtools.TestCase):
def test_print_list_with_list(self):
Row = collections.namedtuple('Row', ['foo', 'bar'])
to_print = [Row(foo='fake_foo1', bar='fake_bar2'),
Row(foo='fake_foo2', bar='fake_bar1')]
with CaptureStdout() as cso:
utils.print_list(to_print, ['foo', 'bar'])
# Output should be sorted by the first key (foo)
self.assertEqual("""\
+-----------+-----------+
| foo | bar |
+-----------+-----------+
| fake_foo1 | fake_bar2 |
| fake_foo2 | fake_bar1 |
+-----------+-----------+
""", cso.read())
def test_print_list_with_None_data(self):
Row = collections.namedtuple('Row', ['foo', 'bar'])
to_print = [Row(foo='fake_foo1', bar='None'),
Row(foo='fake_foo2', bar='fake_bar1')]
with CaptureStdout() as cso:
utils.print_list(to_print, ['foo', 'bar'])
# Output should be sorted by the first key (foo)
self.assertEqual("""\
+-----------+-----------+
| foo | bar |
+-----------+-----------+
| fake_foo1 | None |
| fake_foo2 | fake_bar1 |
+-----------+-----------+
""", cso.read())
def test_print_list_with_list_sortby(self):
Row = collections.namedtuple('Row', ['foo', 'bar'])
to_print = [Row(foo='fake_foo1', bar='fake_bar2'),
Row(foo='fake_foo2', bar='fake_bar1')]
with CaptureStdout() as cso:
utils.print_list(to_print, ['foo', 'bar'], sortby_index=1)
# Output should be sorted by the first key (bar)
self.assertEqual("""\
+-----------+-----------+
| foo | bar |
+-----------+-----------+
| fake_foo2 | fake_bar1 |
| fake_foo1 | fake_bar2 |
+-----------+-----------+
""", cso.read())
def test_print_list_with_list_no_sort(self):
Row = collections.namedtuple('Row', ['foo', 'bar'])
to_print = [Row(foo='fake_foo2', bar='fake_bar1'),
Row(foo='fake_foo1', bar='fake_bar2')]
with CaptureStdout() as cso:
utils.print_list(to_print, ['foo', 'bar'], sortby_index=None)
# Output should be in the order given
self.assertEqual("""\
+-----------+-----------+
| foo | bar |
+-----------+-----------+
| fake_foo2 | fake_bar1 |
| fake_foo1 | fake_bar2 |
+-----------+-----------+
""", cso.read())
def test_print_list_with_generator(self):
Row = collections.namedtuple('Row', ['foo', 'bar'])
def gen_rows():
for row in [Row(foo='fake_foo1', bar='fake_bar2'),
Row(foo='fake_foo2', bar='fake_bar1')]:
yield row
with CaptureStdout() as cso:
utils.print_list(gen_rows(), ['foo', 'bar'])
self.assertEqual("""\
+-----------+-----------+
| foo | bar |
+-----------+-----------+
| fake_foo1 | fake_bar2 |
| fake_foo2 | fake_bar1 |
+-----------+-----------+
""", cso.read())
class PrintDictTestCase(testtools.TestCase):
def test_print_dict(self):
data = {'foo': 'fake_foo', 'bar': 'fake_bar'}
with CaptureStdout() as cso:
utils.print_dict(data)
# Output should be sorted by the Property
self.assertEqual("""\
+----------+----------+
| Property | Value |
+----------+----------+
| bar | fake_bar |
| foo | fake_foo |
+----------+----------+
""", cso.read())

View File

@ -0,0 +1,209 @@
# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import json
import six
import six.moves.urllib.parse as urlparse
import testtools
class FakeAPI(object):
def __init__(self, fixtures):
self.fixtures = fixtures
self.calls = []
def _request(self, method, url, headers=None, data=None,
content_length=None):
call = build_call_record(method, sort_url_by_query_keys(url),
headers or {}, data)
if content_length is not None:
call = tuple(list(call) + [content_length])
self.calls.append(call)
fixture = self.fixtures[sort_url_by_query_keys(url)][method]
data = fixture[1]
if isinstance(fixture[1], six.string_types):
try:
data = json.loads(fixture[1])
except ValueError:
data = six.StringIO(fixture[1])
return FakeResponse(fixture[0], fixture[1]), data
def get(self, *args, **kwargs):
return self._request('GET', *args, **kwargs)
def post(self, *args, **kwargs):
return self._request('POST', *args, **kwargs)
def put(self, *args, **kwargs):
return self._request('PUT', *args, **kwargs)
def patch(self, *args, **kwargs):
return self._request('PATCH', *args, **kwargs)
def delete(self, *args, **kwargs):
return self._request('DELETE', *args, **kwargs)
def head(self, *args, **kwargs):
return self._request('HEAD', *args, **kwargs)
class RawRequest(object):
def __init__(self, headers, body=None,
version=1.0, status=200, reason="Ok"):
"""
:param headers: dict representing HTTP response headers
:param body: file-like object
:param version: HTTP Version
:param status: Response status code
:param reason: Status code related message.
"""
self.body = body
self.status = status
self.reason = reason
self.version = version
self.headers = headers
def getheaders(self):
return copy.deepcopy(self.headers).items()
def getheader(self, key, default):
return self.headers.get(key, default)
def read(self, amt):
return self.body.read(amt)
class FakeResponse(object):
def __init__(self, headers=None, body=None,
version=1.0, status_code=200, reason="Ok"):
"""
:param headers: dict representing HTTP response headers
:param body: file-like object
:param version: HTTP Version
:param status: Response status code
:param reason: Status code related message.
"""
self.body = body
self.reason = reason
self.version = version
self.headers = headers
self.status_code = status_code
self.raw = RawRequest(headers, body=body, reason=reason,
version=version, status=status_code)
@property
def ok(self):
return (self.status_code < 400 or
self.status_code >= 600)
def read(self, amt):
return self.body.read(amt)
def close(self):
pass
@property
def content(self):
if hasattr(self.body, "read"):
return self.body.read()
return self.body
@property
def text(self):
if isinstance(self.content, six.binary_type):
return self.content.decode('utf-8')
return self.content
def json(self, **kwargs):
return self.body and json.loads(self.text) or ""
def iter_content(self, chunk_size=1, decode_unicode=False):
while True:
chunk = self.raw.read(chunk_size)
if not chunk:
break
yield chunk
class TestCase(testtools.TestCase):
TEST_REQUEST_BASE = {
'config': {'danger_mode': False},
'verify': True}
class FakeTTYStdout(six.StringIO):
"""A Fake stdout that try to emulate a TTY device as much as possible."""
def isatty(self):
return True
def write(self, data):
# When a CR (carriage return) is found reset file.
if data.startswith('\r'):
self.seek(0)
data = data[1:]
return six.StringIO.write(self, data)
class FakeNoTTYStdout(FakeTTYStdout):
"""A Fake stdout that is not a TTY device."""
def isatty(self):
return False
def sort_url_by_query_keys(url):
"""A helper function which sorts the keys of the query string of a url.
For example, an input of '/v2/tasks?sort_key=id&sort_dir=asc&limit=10'
returns '/v2/tasks?limit=10&sort_dir=asc&sort_key=id'. This is to
prevent non-deterministic ordering of the query string causing
problems with unit tests.
:param url: url which will be ordered by query keys
:returns url: url with ordered query keys
"""
parsed = urlparse.urlparse(url)
queries = urlparse.parse_qsl(parsed.query, True)
sorted_query = sorted(queries, key=lambda x: x[0])
encoded_sorted_query = urlparse.urlencode(sorted_query, True)
url_parts = (parsed.scheme, parsed.netloc, parsed.path,
parsed.params, encoded_sorted_query,
parsed.fragment)
return urlparse.urlunparse(url_parts)
def build_call_record(method, url, headers, data):
"""Key the request body be ordered if it's a dict type."""
if isinstance(data, dict):
data = sorted(data.items())
if isinstance(data, six.string_types):
# NOTE(flwang): For image update, the data will be a 'list' which
# contains operation dict, such as: [{"op": "remove", "path": "/a"}]
try:
data = json.loads(data)
except ValueError:
return (method, url, headers or {}, data)
data = [sorted(d.items()) for d in data]
return (method, url, headers or {}, data)

View File

View File

@ -0,0 +1,65 @@
# Copyright 2013 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from bileanclient.common import utils
from bileanclient.v1.resources import ResourceManager
import mock
import testtools
FAKE_ID = 'FAKE_ID'
fake_resource = {'id': FAKE_ID}
class ResourceManagerTest(testtools.TestCase):
def setUp(self):
super(ResourceManagerTest, self).setUp()
self.mgr = ResourceManager(None)
@mock.patch.object(ResourceManager, '_list')
def test_list_resource(self, mock_list):
mock_list.return_value = [fake_resource]
result = self.mgr.list()
self.assertEqual(fake_resource, result.next())
# Make sure url is correct.
mock_list.assert_called_once_with('/resources?', 'resources')
@mock.patch.object(ResourceManager, '_list')
def test_list_resource_with_kwargs(self, mock_list):
mock_list.return_value = [fake_resource]
kwargs = {'limit': 2,
'marker': FAKE_ID,
'filters': {
'resource_type': 'os.nova.server',
'user_id': FAKE_ID}}
result = self.mgr.list(**kwargs)
self.assertEqual(fake_resource, result.next())
# Make sure url is correct.
self.assertEqual(1, mock_list.call_count)
args = mock_list.call_args
self.assertEqual(2, len(args[0]))
url, param = args[0]
self.assertEqual('resources', param)
base_url, query_params = utils.parse_query_url(url)
self.assertEqual('/resources', base_url)
expected_query_dict = {'limit': ['2'],
'marker': [FAKE_ID],
'resource_type': ['os.nova.server'],
'user_id': [FAKE_ID]}
self.assertEqual(expected_query_dict, query_params)
@mock.patch.object(ResourceManager, '_get')
def test_get_resource(self, mock_get):
self.mgr.get(FAKE_ID)
mock_get.assert_called_once_with('/resources/%s' % FAKE_ID, 'resource')

View File

@ -34,7 +34,7 @@ class Client(object):
def __init__(self, *args, **kwargs):
"""Initialize a new client for the Bilean v1 API."""
self.http_client = http._construct_http_client(*args, **kwargs)
self.http_client = http.get_http_client(*args, **kwargs)
self.users = users.UserManager(self.http_client)
self.rules = rules.RuleManager(self.http_client)
self.policies = policies.PolicyManager(self.http_client)

View File

@ -28,19 +28,32 @@ class User(base.Resource):
class UserManager(base.BaseManager):
resource_class = User
def _list(self, url, response_key, obj_class=None, body=None):
resp, body = self.client.get(url)
if obj_class is None:
obj_class = self.resource_class
data = body[response_key]
return ([obj_class(self, res, loaded=True) for res in data if res],
resp)
def list(self, **kwargs):
"""Retrieve a list of users.
:rtype: list of :class:`User`.
"""
def paginate(params):
def paginate(params, return_request_id=None):
'''Paginate users, even if more than API limit.'''
current_limit = int(params.get('limit') or 0)
url = '/users?%s' % parse.urlencode(params, True)
users = self._list(url, 'users')
users, resp = self._list(url, 'users')
for user in users:
yield user
if return_request_id is not None:
return_request_id.append(resp.headers.get(OS_REQ_ID_HDR, None))
num_users = len(users)
remaining_limit = current_limit - num_users
if remaining_limit > 0 and num_users > 0:
@ -49,6 +62,7 @@ class UserManager(base.BaseManager):
for user in paginate(params):
yield user
return_request_id = kwargs.get('return_req_id', None)
params = {}
if 'filters' in kwargs:
filters = kwargs.pop('filters')
@ -60,14 +74,28 @@ class UserManager(base.BaseManager):
return paginate(params)
def get(self, user_id):
def get(self, user_id, return_request_id=None):
"""Get the details for a specific user.
:param user_id: ID of the user
:param return_request_id: If an empty list is provided, populate this
list with the request ID value from the header
x-openstack-request-id
"""
return self._get('/users/%s' % user_id, 'user')
resp, body = self.client.get('/users/%s' % parse.quote(str(user_id)))
if return_request_id is not None:
return_request_id.append(resp.headers.get(OS_REQ_ID_HDR, None))
data = body.get('user')
return self.resource_class(self, data, loaded=True)
def action(self, user_id, **kwargs):
"""Perform specified action on user."""
url = '/users/%s/action' % user_id
return self._post(url, json=kwargs, response_key='user')
"""Perform specified action on user.
:param user_id: ID of the user
"""
url = '/users/%s/action' % parse.quote(str(user_id))
return_request_id = kwargs.pop('return_req_id', None)
resp, body = self.client.post(url, data=kwargs)
if return_request_id is not None:
return_request_id.append(resp.headers.get(OS_REQ_ID_HDR, None))
return self.resource_class(self, body.get('user'), loaded=True)

View File

@ -2,15 +2,15 @@
# of appearance. Changing the order has an impact on the overall integration
# process, which may cause wedges in the gate later.
hacking<0.11,>=0.10.0
coverage>=3.6 #Apache-2.0
discover # BSD
mock>=1.2 # BSD
mox3>=0.7.0 # Apache-2.0
ordereddict # MIT
fixtures<2.0,>=1.3.1 # Apache-2.0/BSD
python-subunit>=0.0.18 # Apache-2.0/BSD
requests-mock>=0.7.0 # Apache-2.0
sphinx!=1.2.0,!=1.3b1,<1.3,>=1.1.2 # BSD
oslosphinx!=3.4.0,>=2.5.0 # Apache-2.0
oslotest>=1.10.0 # Apache-2.0
sphinx!=1.2.0,!=1.3b1,<1.3,>=1.1.2
testrepository>=0.0.18 # Apache-2.0/BSD
testscenarios>=0.4 # Apache-2.0/BSD

View File

@ -14,6 +14,7 @@ deps =
commands =
find . -type f -name "*.pyc" -delete
python setup.py testr --slowest --testr-args='{posargs}'
whitelist_externals = find
[testenv:pypy]
deps = setuptools<3.2